Passed
Push — develop ( e15ceb...5f9839 )
by Dean
02:51
created

Main.should_defer()   D

Complexity

Conditions 8

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 60.7343

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 27
ccs 1
cts 16
cp 0.0625
rs 4
cc 8
crap 60.7343
1 1
from plugin.models import SyncResult
2 1
from plugin.modules.core.manager import ModuleManager
3 1
from plugin.preferences import Preferences
4 1
from plugin.sync.core.enums import SyncMedia
5 1
from plugin.sync.core.exceptions import QueueError
6 1
from plugin.sync.core.task import SyncTask
7 1
from plugin.sync.handlers import *
8 1
from plugin.sync.modes import *
9 1
from plugin.sync.triggers import LibraryUpdateTrigger
10
11 1
from datetime import datetime, timedelta
12 1
from threading import Lock, Thread
13 1
import logging
14 1
import Queue
15 1
import sys
16 1
import time
17
18 1
log = logging.getLogger(__name__)
19
20 1
HANDLERS = [
21
    Collection,
22
    List,
23
    Playback,
24
    Ratings,
25
    Watched
26
]
27
28 1
MODES = [
29
    FastPull,
30
    Full,
31
    Pull,
32
    Push
33
]
34
35
36 1
class Main(object):
37 1
    def __init__(self):
38 1
        self.current = None
39
40 1
        self._queue = Queue.PriorityQueue()
41 1
        self._queue_lock = Lock()
42
43 1
        self._spawn_lock = Lock()
44 1
        self._thread = None
45
46
        # Triggers
47 1
        self._library_update = LibraryUpdateTrigger(self)
48
49 1
    def queue(self, account, mode, data=None, media=SyncMedia.All, priority=10, trigger=SyncResult.Trigger.Manual, **kwargs):
50
        """Queue a sync for the provided account
51
52
        Note: if a sync is already queued for the provided account a `SyncError` will be raised.
53
54
        :param account: Account to synchronize with trakt
55
        :type account: int or plugin.models.Account
56
57
        :param mode: Syncing mode (pull, push, etc..)
58
        :type mode: int (plugin.sync.SyncMode)
59
60
        :param data: Data to synchronize (collection, ratings, etc..)
61
        :type data: int (plugin.sync.SyncData)
62
63
        :param media: Media to synchronize (movies, shows, etc..)
64
        :type media: int (plugin.sync.SyncMedia)
65
66
        :return: `SyncResult` object with details on the sync outcome.
67
        :rtype: plugin.sync.core.result.SyncResult
68
        """
69
        try:
70
            # Create new task
71
            task = SyncTask.create(account, mode, data, media, trigger, **kwargs)
72
        except Exception, ex:
73
            log.warn('Unable to construct task: %s', ex, exc_info=True)
74
            raise QueueError('Error', 'Unable to construct task: %s' % ex)
75
76
        with self._queue_lock:
77
            # Ensure we only have one task queued per account
78
            account_tasks = [
79
                t for (p, a, t) in self._queue.queue
80
                if (
81
                    a == task.account.id and
82
                    t.result and
83
                    (trigger != SyncResult.Trigger.Manual or t.result.trigger == trigger)
84
                )
85
            ]
86
87
            if len(account_tasks):
88
                raise QueueError("Unable to queue sync", "Sync has already been queued for this account")
89
90
            # Queue task until the thread is available
91
            self._queue.put((priority, task.account.id, task), block=False)
92
93
            # Ensure thread is active
94
            self.spawn()
95
96
        # Wait for task start
97
        for x in xrange(10):
98
            if task.started:
99
                log.debug('Task %r has started', task)
100
                return
101
102
            time.sleep(2)
103
104
        raise QueueError("Sync queued", "Sync will start once the currently queued tasks have finished")
105
106 1
    def spawn(self):
107
        """Ensure syncing thread has been spawned"""
108
        with self._spawn_lock:
109
            if self._thread is not None:
110
                return
111
112
            self._thread = Thread(target=self.run_wrapper)
113
            self._thread.start()
114
115
            log.debug('Spawned syncing thread: %r', self._thread)
116
117 1
    def run_wrapper(self):
118
        while True:
119
            try:
120
                # Retrieve task from queue
121
                try:
122
                    priority, account_id, task = self._queue.get(timeout=30)
123
                except Queue.Empty:
124
                    continue
125
126
                # Check if we should defer this task
127
                if self.should_defer(task):
128
                    # Re-queue sync task
129
                    if priority < 10000:
130
                        priority += 1
131
132
                    self._queue.put((priority, account_id, task), block=False)
133
134
                    # Wait 10 seconds
135
                    time.sleep(10)
136
                    continue
137
138
                # Select task
139
                self.current = task
140
            except Exception, ex:
141
                log.warn('Exception raised in run(): %s', ex, exc_info=True)
142
143
                time.sleep(30)
144
                continue
145
146
            # Start task
147
            try:
148
                log.info('(%r) Started', self.current.mode)
149
                self.current.started = True
150
151
                # Construct modes/handlers for task
152
                self.current.construct(HANDLERS, MODES)
153
154
                # Run in plex authorization context
155
                with self.current.account.plex.authorization():
156
                    # Run in trakt authorization context
157
                    with self.current.account.trakt.authorization():
158
                        # Run sync
159
                        self.run()
160
161
                self.current.success = True
162
            except Exception, ex:
163
                log.warn('Exception raised in run(): %s', ex, exc_info=True)
164
165
                self.current.exceptions.append(sys.exc_info())
166
                self.current.success = False
167
168
            try:
169
                # Sync task complete, run final tasks
170
                self.finish()
171
            except Exception, ex:
172
                log.error('Unable to run final sync tasks: %s', ex, exc_info=True)
173
174 1
    def should_defer(self, task):
175
        if task and task.result:
176
            # Ignore sync conditions on manual triggers
177
            if task.result.trigger == SyncResult.Trigger.Manual:
178
                return False
179
180
            # Ignore sync conditions if the task has been queued for over 12 hours
181
            started_ago = datetime.utcnow() - task.result.started_at
182
183
            if started_ago > timedelta(hours=12):
184
                log.debug('Task has been queued for over 12 hours, ignoring sync conditions')
185
                return False
186
187
        if Preferences.get('sync.idle_defer'):
188
            # Defer sync tasks until server finishes streaming (and is idle for 30 minutes)
189
            if ModuleManager['sessions'].is_streaming():
190
                log.debug('Deferring sync task, server is currently streaming media')
191
                return True
192
193
            if not ModuleManager['sessions'].is_idle():
194
                log.debug(
195
                    'Deferring sync task, server has been streaming media recently (in the last %d minutes)',
196
                    Preferences.get('sync.idle_delay')
197
                )
198
                return True
199
200
        return False
201
202 1
    def finish(self):
203
        # Cleanup `current` task
204
        current = self.current
205
        current.finish()
206
207
        # Task finished
208
        log.info('(%r) Done', current.mode)
209
210
        # Cleanup sync manager
211
        self.current = None
212
213 1
    def cancel(self, id):
214
        """Trigger a currently running sync to abort
215
216
        Note: A sync will only cancel at the next "safe" cancel point, this will not
217
        force a thread to end immediately.
218
219
        :return: `True` if a sync has been triggered to cancel,
220
                 `False` if there was no sync to cancel.
221
        :rtype: bool
222
        """
223
        current = self.current
224
225
        if current is None:
226
            # No active sync task
227
            return True
228
229
        if current.id != id:
230
            # Active task doesn't match `id`
231
            return False
232
233
        # Request task abort
234
        current.abort(timeout=10)
235
236
        log.info('(%r) Abort', current.mode)
237
        return True
238
239 1
    def run(self):
240
        # Trigger sync methods
241
        self._trigger([
242
            'construct',
243
            'start',
244
            'run',
245
            'stop'
246
        ])
247
248 1
    def _trigger(self, names):
249
        if self.current.mode not in self.current.modes:
250
            log.warn('Unknown sync mode: %r', self.current.mode)
251
            return
252
253
        mode = self.current.modes[self.current.mode]
254
255
        for name in names:
256
            func = getattr(mode, name, None)
257
258
            if not func:
259
                log.warn('Unknown method: %r', name)
260
                return
261
262
            func()
263
264
265
Sync = Main()
266