Passed
Push — master ( aeb165...d9ae97 )
by Dean
03:04
created

Main.queue()   D

Complexity

Conditions 8

Size

Total Lines 49

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 61.9073
Metric Value
dl 0
loc 49
ccs 1
cts 18
cp 0.0556
rs 4.7619
cc 8
crap 61.9073
1 1
from threading import Thread
2 1
from plugin.models import SyncResult
3 1
from plugin.sync.core.enums import SyncMedia
4 1
from plugin.sync.core.exceptions import QueueError
5 1
from plugin.sync.core.task import SyncTask
6 1
from plugin.sync.handlers import *
7 1
from plugin.sync.modes import *
8 1
from plugin.sync.triggers import LibraryUpdateTrigger
9
10 1
from threading import Lock
11 1
import logging
12 1
import Queue
13 1
import sys
14 1
import time
15
16 1
log = logging.getLogger(__name__)
17
18 1
HANDLERS = [
19
    Collection,
20
    List,
21
    Playback,
22
    Ratings,
23
    Watched
24
]
25
26 1
MODES = [
27
    FastPull,
28
    Full,
29
    Pull,
30
    Push
31
]
32
33
34 1
class Main(object):
35 1
    def __init__(self):
36 1
        self.current = None
37
38 1
        self._queue = Queue.PriorityQueue()
39 1
        self._queue_lock = Lock()
40
41 1
        self._spawn_lock = Lock()
42 1
        self._thread = None
43
44
        # Triggers
45 1
        self._library_update = LibraryUpdateTrigger(self)
46
47 1
    def queue(self, account, mode, data=None, media=SyncMedia.All, priority=10, trigger=SyncResult.Trigger.Manual, **kwargs):
48
        """Queue a sync for the provided account
49
50
        Note: if a sync is already queued for the provided account a `SyncError` will be raised.
51
52
        :param account: Account to synchronize with trakt
53
        :type account: int or plugin.models.Account
54
55
        :param mode: Syncing mode (pull, push, etc..)
56
        :type mode: int (plugin.sync.SyncMode)
57
58
        :param data: Data to synchronize (collection, ratings, etc..)
59
        :type data: int (plugin.sync.SyncData)
60
61
        :param media: Media to synchronize (movies, shows, etc..)
62
        :type media: int (plugin.sync.SyncMedia)
63
64
        :return: `SyncResult` object with details on the sync outcome.
65
        :rtype: plugin.sync.core.result.SyncResult
66
        """
67
        try:
68
            # Create new task
69
            task = SyncTask.create(account, mode, data, media, trigger, **kwargs)
70
        except Exception, ex:
71
            log.warn('Unable to construct task: %s', ex, exc_info=True)
72
            raise QueueError('Error', 'Unable to construct task: %s' % ex)
73
74
        with self._queue_lock:
75
            # Ensure we only have one task queued per account
76
            account_tasks = [t for (p, a, t) in self._queue.queue if a == task.account.id]
77
78
            if len(account_tasks):
79
                raise QueueError("Unable to queue sync", "Sync has already been queued for this account")
80
81
            # Queue task until the thread is available
82
            self._queue.put((priority, task.account.id, task), block=False)
83
84
            # Ensure thread is active
85
            self.spawn()
86
87
        # Wait for task start
88
        for x in xrange(3):
89
            if task.started:
90
                log.debug('Task %r has started', task)
91
                return
92
93
            time.sleep(1)
94
95
        raise QueueError("Sync queued", "Sync will start once the currently queued tasks have finished")
96
97 1
    def spawn(self):
98
        """Ensure syncing thread has been spawned"""
99
        with self._spawn_lock:
100
            if self._thread is not None:
101
                return
102
103
            self._thread = Thread(target=self.run_wrapper)
104
            self._thread.start()
105
106
            log.debug('Spawned syncing thread: %r', self._thread)
107
108 1
    def run_wrapper(self):
109
        while True:
110
            # Retrieve task from queue
111
            try:
112
                _, _, self.current = self._queue.get(timeout=30)
113
            except Queue.Empty:
114
                continue
115
116
            # Start task
117
            log.info('(%r) Started', self.current.mode)
118
            self.current.started = True
119
120
            try:
121
                # Construct modes/handlers for task
122
                self.current.construct(HANDLERS, MODES)
123
124
                # Run in plex authorization context
125
                with self.current.account.plex.authorization():
126
                    # Run in trakt authorization context
127
                    with self.current.account.trakt.authorization():
128
                        # Run sync
129
                        self.run()
130
131
                self.current.success = True
132
            except Exception, ex:
133
                log.warn('Exception raised in run(): %s', ex, exc_info=True)
134
135
                self.current.exceptions.append(sys.exc_info())
136
                self.current.success = False
137
138
            try:
139
                # Sync task complete, run final tasks
140
                self.finish()
141
            except Exception, ex:
142
                log.error('Unable to run final sync tasks: %s', ex, exc_info=True)
143
144 1
    def finish(self):
145
        # Cleanup `current` task
146
        current = self.current
147
        current.finish()
148
149
        # Task finished
150
        log.info('(%r) Done', current.mode)
151
152
        # Cleanup sync manager
153
        self.current = None
154
155 1
    def cancel(self, id):
156
        """Trigger a currently running sync to abort
157
158
        Note: A sync will only cancel at the next "safe" cancel point, this will not
159
        force a thread to end immediately.
160
161
        :return: `True` if a sync has been triggered to cancel,
162
                 `False` if there was no sync to cancel.
163
        :rtype: bool
164
        """
165
        current = self.current
166
167
        if current is None:
168
            # No active sync task
169
            return True
170
171
        if current.id != id:
172
            # Active task doesn't match `id`
173
            return False
174
175
        # Request task abort
176
        current.abort(timeout=10)
177
178
        log.info('(%r) Abort', current.mode)
179
        return True
180
181 1
    def run(self):
182
        # Trigger sync methods
183
        self._trigger([
184
            'construct',
185
            'start',
186
            'run',
187
            'stop'
188
        ])
189
190 1
    def _trigger(self, names):
191
        if self.current.mode not in self.current.modes:
192
            log.warn('Unknown sync mode: %r', self.current.mode)
193
            return
194
195
        mode = self.current.modes[self.current.mode]
196
197
        for name in names:
198
            func = getattr(mode, name, None)
199
200
            if not func:
201
                log.warn('Unknown method: %r', name)
202
                return
203
204
            func()
205
206
207
Sync = Main()
208