Passed
Push — beta ( 72a57d...7d0ef0 )
by Dean
03:02
created

Main.queue()   D

Complexity

Conditions 11

Size

Total Lines 56

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 112.9185

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 56
ccs 1
cts 18
cp 0.0556
rs 4.4262
cc 11
crap 112.9185

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Main.queue() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from plugin.models import SyncResult
2 1
from plugin.modules.core.manager import ModuleManager
3 1
from plugin.preferences import Preferences
4 1
from plugin.sync.core.enums import SyncMedia
5 1
from plugin.sync.core.exceptions import QueueError
6 1
from plugin.sync.core.task import SyncTask
7 1
from plugin.sync.handlers import *
8 1
from plugin.sync.modes import *
9 1
from plugin.sync.triggers import LibraryUpdateTrigger
10
11 1
from datetime import datetime, timedelta
12 1
from threading import Lock, Thread
13 1
import logging
14 1
import Queue
15 1
import sys
16 1
import time
17
18 1
log = logging.getLogger(__name__)
19
20 1
HANDLERS = [
21
    Collection,
22
    List,
23
    Playback,
24
    Ratings,
25
    Watched
26
]
27
28 1
MODES = [
29
    FastPull,
30
    Full,
31
    Pull,
32
    Push
33
]
34
35
36 1
class Main(object):
37 1
    def __init__(self):
38 1
        self.current = None
39
40 1
        self._queue = Queue.PriorityQueue()
41 1
        self._queue_lock = Lock()
42
43 1
        self._spawn_lock = Lock()
44 1
        self._thread = None
45
46
        # Triggers
47 1
        self._library_update = LibraryUpdateTrigger(self)
48
49 1
    def queue(self, account, mode, data=None, media=SyncMedia.All, priority=10, trigger=SyncResult.Trigger.Manual, **kwargs):
50
        """Queue a sync for the provided account
51
52
        Note: if a sync is already queued for the provided account a `SyncError` will be raised.
53
54
        :param account: Account to synchronize with trakt
55
        :type account: int or plugin.models.Account
56
57
        :param mode: Syncing mode (pull, push, etc..)
58
        :type mode: int (plugin.sync.SyncMode)
59
60
        :param data: Data to synchronize (collection, ratings, etc..)
61
        :type data: int (plugin.sync.SyncData)
62
63
        :param media: Media to synchronize (movies, shows, etc..)
64
        :type media: int (plugin.sync.SyncMedia)
65
66
        :return: `SyncResult` object with details on the sync outcome.
67
        :rtype: plugin.sync.core.result.SyncResult
68
        """
69
        try:
70
            # Create new task
71
            task = SyncTask.create(account, mode, data, media, trigger, **kwargs)
72
        except Exception, ex:
73
            log.warn('Unable to construct task: %s', ex, exc_info=True)
74
            raise QueueError('Error', 'Unable to construct task: %s' % ex)
75
76
        with self._queue_lock:
77
            # Ensure we only have one task queued per account
78
            account_tasks = [
79
                t for (p, a, t) in self._queue.queue
80
                if (
81
                    a == task.account.id and
82
                    t.result and
83
                    (trigger != SyncResult.Trigger.Manual or t.result.trigger == trigger)
84
                )
85
            ]
86
87
            if len(account_tasks):
88
                raise QueueError("Unable to queue sync", "Sync has already been queued for this account")
89
90
            # Queue task until the thread is available
91
            self._queue.put((priority, task.account.id, task), block=False)
92
93
            # Ensure thread is active
94
            self.spawn()
95
96
        # Wait for task start
97
        for x in xrange(10):
98
            if task.started:
99
                log.debug('Task %r has started', task)
100
                return
101
102
            time.sleep(2)
103
104
        raise QueueError("Sync queued", "Sync will start once the currently queued tasks have finished")
105
106 1
    def spawn(self):
107
        """Ensure syncing thread has been spawned"""
108
        with self._spawn_lock:
109
            if self._thread is not None:
110
                return
111
112
            self._thread = Thread(target=self.run_wrapper)
113
            self._thread.start()
114
115
            log.debug('Spawned syncing thread: %r', self._thread)
116
117 1
    def run_wrapper(self):
118
        while True:
119
            try:
120
                # Retrieve task from queue
121
                try:
122
                    priority, account_id, task = self._queue.get(timeout=30)
123
                except Queue.Empty:
124
                    continue
125
126
                # Check if we should defer this task
127
                if self.should_defer(task):
128
                    # Re-queue sync task
129
                    if priority < 10000:
130
                        priority += 1
131
132
                    self._queue.put((priority, account_id, task), block=False)
133
134
                    # Wait 10 seconds
135
                    time.sleep(10)
136
                    continue
137
138
                # Select task
139
                self.current = task
140
            except Exception, ex:
141
                log.warn('Exception raised in run(): %s', ex, exc_info=True)
142
143
                time.sleep(30)
144
                continue
145
146
            # Start task
147
            try:
148
                log.info('(%r) Started', self.current.mode)
149
                self.current.started = True
150
151
                # Construct modes/handlers for task
152
                self.current.construct(HANDLERS, MODES)
153
154
                # Run in plex authorization context
155
                with self.current.account.plex.authorization():
156
                    # Run in trakt authorization context
157
                    with self.current.account.trakt.authorization():
158
                        # Run sync
159
                        self.run()
160
161
                self.current.success = True
162
            except Exception, ex:
163
                log.warn('Exception raised in run(): %s', ex, exc_info=True)
164
165
                self.current.exceptions.append(sys.exc_info())
166
                self.current.success = False
167
168
            try:
169
                # Sync task complete, run final tasks
170
                self.finish()
171
            except Exception, ex:
172
                log.error('Unable to run final sync tasks: %s', ex, exc_info=True)
173
174 1
    def should_defer(self, task):
175
        if task and task.result:
176
            # Ignore sync conditions on manual triggers
177
            if task.result.trigger == SyncResult.Trigger.Manual:
178
                return False
179
180
            # Ignore sync conditions if the task has been queued for over 12 hours
181
            started_ago = datetime.utcnow() - task.result.started_at
182
183
            if started_ago > timedelta(hours=12):
184
                log.debug('Task has been queued for over 12 hours, ignoring sync conditions')
185
                return False
186
187
        if Preferences.get('sync.idle_defer'):
188
            # Defer sync tasks until server finishes streaming (and is idle for 30 minutes)
189
            if ModuleManager['sessions'].is_streaming():
190
                log.debug('Deferring sync task, server is currently streaming media')
191
                return True
192
193
            if not ModuleManager['sessions'].is_idle():
194
                log.debug(
195
                    'Deferring sync task, server has been streaming media recently (in the last %d minutes)',
196
                    Preferences.get('sync.idle_delay')
197
                )
198
                return True
199
200
        return False
201
202 1
    def finish(self):
203
        # Cleanup `current` task
204
        current = self.current
205
        current.finish()
206
207
        # Task finished
208
        log.info('(%r) Done', current.mode)
209
210
        # Cleanup sync manager
211
        self.current = None
212
213 1
    def cancel(self, id):
214
        """Trigger a currently running sync to abort
215
216
        Note: A sync will only cancel at the next "safe" cancel point, this will not
217
        force a thread to end immediately.
218
219
        :return: `True` if a sync has been triggered to cancel,
220
                 `False` if there was no sync to cancel.
221
        :rtype: bool
222
        """
223
        current = self.current
224
225
        if current is None:
226
            # No active sync task
227
            return True
228
229
        if current.id != id:
230
            # Active task doesn't match `id`
231
            return False
232
233
        # Request task abort
234
        current.abort(timeout=10)
235
236
        log.info('(%r) Abort', current.mode)
237
        return True
238
239 1
    def run(self):
240
        # Trigger sync methods
241
        self._trigger([
242
            'construct',
243
            'start',
244
            'run',
245
            'stop'
246
        ])
247
248 1
    def _trigger(self, names):
249
        if self.current.mode not in self.current.modes:
250
            log.warn('Unknown sync mode: %r', self.current.mode)
251
            return
252
253
        mode = self.current.modes[self.current.mode]
254
255
        for name in names:
256
            func = getattr(mode, name, None)
257
258
            if not func:
259
                log.warn('Unknown method: %r', name)
260
                return
261
262
            func()
263
264
265
Sync = Main()
266