Main.queue()   F
last analyzed

Complexity

Conditions 13

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 159.9651

Importance

Changes 4
Bugs 0 Features 0
Metric Value
dl 0
loc 64
ccs 1
cts 22
cp 0.0455
rs 2.7658
c 4
b 0
f 0
cc 13
crap 159.9651

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Main.queue() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from plugin.core.message import InterfaceMessages
2 1
from plugin.models import SyncResult
3 1
from plugin.modules.core.manager import ModuleManager
4 1
from plugin.preferences import Preferences
5 1
from plugin.sync.core.enums import SyncMedia
6 1
from plugin.sync.core.exceptions import QueueError
7 1
from plugin.sync.core.task import SyncTask
8 1
from plugin.sync.handlers import *
0 ignored issues
show
Coding Style introduced by
The usage of wildcard imports like plugin.sync.handlers should generally be avoided.
Loading history...
9 1
from plugin.sync.modes import *
0 ignored issues
show
Coding Style introduced by
The usage of wildcard imports like plugin.sync.modes should generally be avoided.
Loading history...
10 1
from plugin.sync.triggers import LibraryUpdateTrigger
11
12 1
from datetime import datetime, timedelta
13 1
from plex_database.library import TZ_LOCAL
14 1
from threading import Lock, Thread
15 1
import logging
16 1
import Queue
17 1
import sys
18 1
import time
19
20 1
log = logging.getLogger(__name__)
21
22 1
HANDLERS = [
23
    Collection,
24
    List,
25
    Playback,
26
    Ratings,
27
    Watched
28
]
29
30 1
MODES = [
31
    FastPull,
32
    Full,
33
    Pull,
34
    Push
35
]
36
37
# Display error if the system timezone is not available
38 1
if TZ_LOCAL is None:
39
    InterfaceMessages.add(logging.ERROR, 'Unable to retrieve system timezone, syncing will not be available')
40
41
42 1
class Main(object):
43 1
    def __init__(self):
44 1
        self.current = None
45
46 1
        self._queue = Queue.PriorityQueue()
47 1
        self._queue_lock = Lock()
48
49 1
        self._spawn_lock = Lock()
50 1
        self._thread = None
51
52
        # Triggers
53 1
        self._library_update = LibraryUpdateTrigger(self)
54
55 1
    def queue(self, account, mode, data=None, media=SyncMedia.All, priority=10, trigger=SyncResult.Trigger.Manual, **kwargs):
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (125/120).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
56
        """Queue a sync for the provided account
57
58
        Note: if a sync is already queued for the provided account a `SyncError` will be raised.
59
60
        :param account: Account to synchronize with trakt
61
        :type account: int or plugin.models.Account
62
63
        :param mode: Syncing mode (pull, push, etc..)
64
        :type mode: int (plugin.sync.SyncMode)
65
66
        :param data: Data to synchronize (collection, ratings, etc..)
67
        :type data: int (plugin.sync.SyncData)
68
69
        :param media: Media to synchronize (movies, shows, etc..)
70
        :type media: int (plugin.sync.SyncMedia)
71
72
        :return: `SyncResult` object with details on the sync outcome.
73
        :rtype: plugin.sync.core.result.SyncResult
74
        """
75
        if InterfaceMessages.critical:
0 ignored issues
show
Bug introduced by
The Class InterfaceMessages does not seem to have a member named critical.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
76
            raise QueueError('Error', InterfaceMessages.message)
0 ignored issues
show
Bug introduced by
The Class InterfaceMessages does not seem to have a member named message.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
77
78
        if TZ_LOCAL is None:
79
            raise QueueError('Error', 'Unable to retrieve system timezone')
80
81
        try:
82
            # Create new task
83
            task = SyncTask.create(account, mode, data, media, trigger, **kwargs)
84
        except Exception as ex:
85
            log.warn('Unable to construct task: %s', ex, exc_info=True)
86
87
            # Raise as queue error
88
            raise QueueError('Error', 'Unable to construct task: %s' % ex)
89
90
        with self._queue_lock:
91
            # Ensure we only have one task queued per account
92
            account_tasks = [
93
                t for (p, a, t) in self._queue.queue
94
                if (
95
                    a == task.account.id and
96
                    t.result and
97
                    (trigger != SyncResult.Trigger.Manual or t.result.trigger == trigger)
98
                )
99
            ]
100
101
            if len(account_tasks):
102
                raise QueueError("Unable to queue sync", "Sync has already been queued for this account")
103
104
            # Queue task until the thread is available
105
            self._queue.put((priority, task.account.id, task), block=False)
106
107
            # Ensure thread is active
108
            self.spawn()
109
110
        # Wait for task start
111
        for x in xrange(10):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
Unused Code introduced by
The variable x seems to be unused.
Loading history...
112
            if task.started:
113
                log.debug('Task %r has started', task)
114
                return
115
116
            time.sleep(1)
117
118
        raise QueueError("Sync queued", "Sync will start once the currently queued tasks have finished")
119
120 1
    def spawn(self):
121
        """Ensure syncing thread has been spawned"""
122
        with self._spawn_lock:
123
            if self._thread is not None:
124
                return
125
126
            self._thread = Thread(target=self.run_wrapper)
127
            self._thread.start()
128
129
            log.debug('Spawned syncing thread: %r', self._thread)
130
131 1
    def run_wrapper(self):
132
        while True:
133
            try:
134
                # Retrieve task from queue
135
                try:
136
                    priority, account_id, task = self._queue.get(timeout=30)
137
                except Queue.Empty:
138
                    continue
139
140
                # Check if we should defer this task
141
                if self.should_defer(task):
142
                    # Re-queue sync task
143
                    if priority < 10000:
144
                        priority += 1
145
146
                    self._queue.put((priority, account_id, task), block=False)
147
148
                    # Wait 10 seconds
149
                    time.sleep(10)
150
                    continue
151
152
                # Select task
153
                self.current = task
154
            except Exception as ex:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
155
                log.warn('Exception raised while attempting to retrieve sync task from queue: %s', ex, exc_info=True)
156
157
                time.sleep(30)
158
                continue
159
160
            # Start task
161
            try:
162
                log.info('(%r) Started', self.current.mode)
163
                self.current.started = True
164
165
                # Construct modes/handlers for task
166
                self.current.construct(HANDLERS, MODES)
167
168
                # Run in plex authorization context
169
                with self.current.account.plex.authorization():
170
                    # Run in trakt authorization context
171
                    with self.current.account.trakt.authorization():
172
                        # Run sync
173
                        self.run()
174
175
                self.current.success = True
176
            except Exception as ex:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
177
                log.warn('Exception raised in sync task: %s', ex, exc_info=True)
178
179
                self.current.exceptions.append(sys.exc_info())
180
                self.current.success = False
181
182
            try:
183
                # Sync task complete, run final tasks
184
                self.finish()
185
            except Exception as ex:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
186
                log.error('Exception raised while attempting to finish sync task: %s', ex, exc_info=True)
187
188 1
    def should_defer(self, task):
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
189
        if task and task.result:
190
            # Ignore sync conditions on manual triggers
191
            if task.result.trigger == SyncResult.Trigger.Manual:
192
                return False
193
194
            # Ignore sync conditions if the task has been queued for over 12 hours
195
            started_ago = datetime.utcnow() - task.result.started_at
196
197
            if started_ago > timedelta(hours=12):
198
                log.debug('Task has been queued for over 12 hours, ignoring sync conditions')
199
                return False
200
201
        if Preferences.get('sync.idle_defer'):
202
            # Defer sync tasks until server finishes streaming (and is idle for 30 minutes)
203
            if ModuleManager['sessions'].is_streaming():
204
                log.debug('Deferring sync task, server is currently streaming media')
205
                return True
206
207
            if not ModuleManager['sessions'].is_idle():
208
                log.debug(
209
                    'Deferring sync task, server has been streaming media recently (in the last %d minutes)',
210
                    Preferences.get('sync.idle_delay')
211
                )
212
                return True
213
214
        return False
215
216 1
    def finish(self):
217
        # Cleanup `current` task
218
        current = self.current
219
        current.finish()
220
221
        # Task finished
222
        log.info('(%r) Done', current.mode)
223
224
        # Cleanup sync manager
225
        self.current = None
226
227 1
    def cancel(self, id):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
228
        """Trigger a currently running sync to abort
229
230
        Note: A sync will only cancel at the next "safe" cancel point, this will not
231
        force a thread to end immediately.
232
233
        :return: `True` if a sync has been triggered to cancel,
234
                 `False` if there was no sync to cancel.
235
        :rtype: bool
236
        """
237
        current = self.current
238
239
        if current is None:
240
            # No active sync task
241
            return True
242
243
        if current.id != id:
244
            # Active task doesn't match `id`
245
            return False
246
247
        # Request task abort
248
        current.abort(timeout=10)
249
250
        log.info('(%r) Abort', current.mode)
251
        return True
252
253 1
    def run(self):
254
        # Trigger sync methods
255
        self._trigger([
256
            'construct',
257
            'start',
258
            'run',
259
            'finish',
260
            'stop'
261
        ])
262
263 1
    def _trigger(self, names):
264
        if self.current.mode not in self.current.modes:
265
            log.warn('Unknown sync mode: %r', self.current.mode)
266
            return
267
268
        mode = self.current.modes[self.current.mode]
269
270
        for name in names:
271
            func = getattr(mode, name, None)
272
273
            if not func:
274
                log.warn('Unknown method: %r', name)
275
                return
276
277
            func()
278
279
280
Sync = Main()
281