Test Failed
Push — develop ( a80574...5ed66c )
by Dean
02:36
created

Main.queue()   F

Complexity

Conditions 13

Size

Total Lines 62

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 161.7274

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 62
ccs 1
cts 24
cp 0.0417
rs 2.8934
cc 13
crap 161.7274

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Main.queue() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from plugin.core.message import InterfaceMessages
2 1
from plugin.models import SyncResult
3 1
from plugin.modules.core.manager import ModuleManager
4 1
from plugin.preferences import Preferences
5 1
from plugin.sync.core.enums import SyncMedia
6 1
from plugin.sync.core.exceptions import QueueError
7 1
from plugin.sync.core.task import SyncTask
8 1
from plugin.sync.handlers import *
9 1
from plugin.sync.modes import *
10
from plugin.sync.triggers import LibraryUpdateTrigger
11 1
12 1
from datetime import datetime, timedelta
13 1
from plex_database.library import TZ_LOCAL
14 1
from threading import Lock, Thread
15 1
import logging
16 1
import Queue
17
import sys
18 1
import time
19
20 1
log = logging.getLogger(__name__)
21
22
HANDLERS = [
23
    Collection,
24
    List,
25
    Playback,
26
    Ratings,
27
    Watched
28 1
]
29
30
MODES = [
31
    FastPull,
32
    Full,
33
    Pull,
34
    Push
35
]
36 1
37 1
# Display error if the system timezone is not available
38 1
if TZ_LOCAL is None:
39
    InterfaceMessages.add(logging.ERROR, 'Unable to retrieve system timezone, syncing will not be available')
40 1
41 1
42
class Main(object):
43 1
    def __init__(self):
44 1
        self.current = None
45
46
        self._queue = Queue.PriorityQueue()
47 1
        self._queue_lock = Lock()
48
49 1
        self._spawn_lock = Lock()
50
        self._thread = None
51
52
        # Triggers
53
        self._library_update = LibraryUpdateTrigger(self)
54
55
    def queue(self, account, mode, data=None, media=SyncMedia.All, priority=10, trigger=SyncResult.Trigger.Manual, **kwargs):
56
        """Queue a sync for the provided account
57
58
        Note: if a sync is already queued for the provided account a `SyncError` will be raised.
59
60
        :param account: Account to synchronize with trakt
61
        :type account: int or plugin.models.Account
62
63
        :param mode: Syncing mode (pull, push, etc..)
64
        :type mode: int (plugin.sync.SyncMode)
65
66
        :param data: Data to synchronize (collection, ratings, etc..)
67
        :type data: int (plugin.sync.SyncData)
68
69
        :param media: Media to synchronize (movies, shows, etc..)
70
        :type media: int (plugin.sync.SyncMedia)
71
72
        :return: `SyncResult` object with details on the sync outcome.
73
        :rtype: plugin.sync.core.result.SyncResult
74
        """
75
        if InterfaceMessages.critical:
76
            raise QueueError('Error', InterfaceMessages.message)
77
78
        if TZ_LOCAL is None:
79
            raise QueueError('Error', 'Unable to retrieve system timezone')
80
81
        try:
82
            # Create new task
83
            task = SyncTask.create(account, mode, data, media, trigger, **kwargs)
84
        except Exception as ex:
85
            log.warn('Unable to construct task: %s', ex, exc_info=True)
86
            raise QueueError('Error', 'Unable to construct task: %s' % ex)
87
88
        with self._queue_lock:
89
            # Ensure we only have one task queued per account
90
            account_tasks = [
91
                t for (p, a, t) in self._queue.queue
92
                if (
93
                    a == task.account.id and
94
                    t.result and
95
                    (trigger != SyncResult.Trigger.Manual or t.result.trigger == trigger)
96
                )
97
            ]
98
99
            if len(account_tasks):
100
                raise QueueError("Unable to queue sync", "Sync has already been queued for this account")
101
102
            # Queue task until the thread is available
103
            self._queue.put((priority, task.account.id, task), block=False)
104
105
            # Ensure thread is active
106 1
            self.spawn()
107
108
        # Wait for task start
109
        for x in xrange(10):
110
            if task.started:
111
                log.debug('Task %r has started', task)
112
                return
113
114
            time.sleep(1)
115
116
        raise QueueError("Sync queued", "Sync will start once the currently queued tasks have finished")
117 1
118
    def spawn(self):
119
        """Ensure syncing thread has been spawned"""
120
        with self._spawn_lock:
121
            if self._thread is not None:
122
                return
123
124
            self._thread = Thread(target=self.run_wrapper)
125
            self._thread.start()
126
127
            log.debug('Spawned syncing thread: %r', self._thread)
128
129
    def run_wrapper(self):
130
        while True:
131
            try:
132
                # Retrieve task from queue
133
                try:
134
                    priority, account_id, task = self._queue.get(timeout=30)
135
                except Queue.Empty:
136
                    continue
137
138
                # Check if we should defer this task
139
                if self.should_defer(task):
140
                    # Re-queue sync task
141
                    if priority < 10000:
142
                        priority += 1
143
144
                    self._queue.put((priority, account_id, task), block=False)
145
146
                    # Wait 10 seconds
147
                    time.sleep(10)
148
                    continue
149
150
                # Select task
151
                self.current = task
152
            except Exception, ex:
153
                log.warn('Exception raised in run(): %s', ex, exc_info=True)
154
155
                time.sleep(30)
156
                continue
157
158
            # Start task
159
            try:
160
                log.info('(%r) Started', self.current.mode)
161
                self.current.started = True
162
163
                # Construct modes/handlers for task
164
                self.current.construct(HANDLERS, MODES)
165
166
                # Run in plex authorization context
167
                with self.current.account.plex.authorization():
168
                    # Run in trakt authorization context
169
                    with self.current.account.trakt.authorization():
170
                        # Run sync
171
                        self.run()
172
173
                self.current.success = True
174 1
            except Exception, ex:
175
                log.warn('Exception raised in run(): %s', ex, exc_info=True)
176
177
                self.current.exceptions.append(sys.exc_info())
178
                self.current.success = False
179
180
            try:
181
                # Sync task complete, run final tasks
182
                self.finish()
183
            except Exception, ex:
184
                log.error('Unable to run final sync tasks: %s', ex, exc_info=True)
185
186
    def should_defer(self, task):
187
        if task and task.result:
188
            # Ignore sync conditions on manual triggers
189
            if task.result.trigger == SyncResult.Trigger.Manual:
190
                return False
191
192
            # Ignore sync conditions if the task has been queued for over 12 hours
193
            started_ago = datetime.utcnow() - task.result.started_at
194
195
            if started_ago > timedelta(hours=12):
196
                log.debug('Task has been queued for over 12 hours, ignoring sync conditions')
197
                return False
198
199
        if Preferences.get('sync.idle_defer'):
200
            # Defer sync tasks until server finishes streaming (and is idle for 30 minutes)
201
            if ModuleManager['sessions'].is_streaming():
202 1
                log.debug('Deferring sync task, server is currently streaming media')
203
                return True
204
205
            if not ModuleManager['sessions'].is_idle():
206
                log.debug(
207
                    'Deferring sync task, server has been streaming media recently (in the last %d minutes)',
208
                    Preferences.get('sync.idle_delay')
209
                )
210
                return True
211
212
        return False
213 1
214
    def finish(self):
215
        # Cleanup `current` task
216
        current = self.current
217
        current.finish()
218
219
        # Task finished
220
        log.info('(%r) Done', current.mode)
221
222
        # Cleanup sync manager
223
        self.current = None
224
225
    def cancel(self, id):
226
        """Trigger a currently running sync to abort
227
228
        Note: A sync will only cancel at the next "safe" cancel point, this will not
229
        force a thread to end immediately.
230
231
        :return: `True` if a sync has been triggered to cancel,
232
                 `False` if there was no sync to cancel.
233
        :rtype: bool
234
        """
235
        current = self.current
236
237
        if current is None:
238
            # No active sync task
239 1
            return True
240
241
        if current.id != id:
242
            # Active task doesn't match `id`
243
            return False
244
245
        # Request task abort
246
        current.abort(timeout=10)
247
248
        log.info('(%r) Abort', current.mode)
249 1
        return True
250
251
    def run(self):
252
        # Trigger sync methods
253
        self._trigger([
254
            'construct',
255
            'start',
256
            'run',
257
            'finish',
258
            'stop'
259
        ])
260
261
    def _trigger(self, names):
262
        if self.current.mode not in self.current.modes:
263
            log.warn('Unknown sync mode: %r', self.current.mode)
264
            return
265
266 1
        mode = self.current.modes[self.current.mode]
267
268
        for name in names:
269
            func = getattr(mode, name, None)
270
271
            if not func:
272
                log.warn('Unknown method: %r', name)
273
                return
274
275
            func()
276
277
278
Sync = Main()
279