Passed
Push — master ( 6e8656...9792e6 )
by Dean
05:08 queued 02:39
created

Main.queue()   F

Complexity

Conditions 14

Size

Total Lines 69

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 187.4082

Importance

Changes 3
Bugs 0 Features 0
Metric Value
dl 0
loc 69
ccs 1
cts 25
cp 0.04
rs 2.5118
c 3
b 0
f 0
cc 14
crap 187.4082

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Main.queue() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from plugin.core.message import InterfaceMessages
2 1
from plugin.models import SyncResult
3 1
from plugin.modules.core.manager import ModuleManager
4 1
from plugin.preferences import Preferences
5 1
from plugin.sync.core.enums import SyncMedia
6 1
from plugin.sync.core.exceptions import QueueError
7 1
from plugin.sync.core.task import SyncTask
8 1
from plugin.sync.handlers import *
0 ignored issues
show
Coding Style introduced by
The usage of wildcard imports like plugin.sync.handlers should generally be avoided.
Loading history...
9 1
from plugin.sync.modes import *
0 ignored issues
show
Coding Style introduced by
The usage of wildcard imports like plugin.sync.modes should generally be avoided.
Loading history...
10 1
from plugin.sync.triggers import LibraryUpdateTrigger
11
12 1
from datetime import datetime, timedelta
13 1
from plex_database.library import TZ_LOCAL
14 1
from threading import Lock, Thread
15 1
import logging
16 1
import Queue
0 ignored issues
show
Configuration introduced by
The import Queue could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
17 1
import sys
18 1
import time
19
20 1
log = logging.getLogger(__name__)
21
22 1
HANDLERS = [
23
    Collection,
24
    List,
25
    Playback,
26
    Ratings,
27
    Watched
28
]
29
30 1
MODES = [
31
    FastPull,
32
    Full,
33
    Pull,
34
    Push
35
]
36
37
# Display error if the system timezone is not available
38 1
if TZ_LOCAL is None:
39
    InterfaceMessages.add(logging.ERROR, 'Unable to retrieve system timezone, syncing will not be available')
40
41
42 1
class Main(object):
43 1
    def __init__(self):
44 1
        self.current = None
45
46 1
        self._queue = Queue.PriorityQueue()
47 1
        self._queue_lock = Lock()
48
49 1
        self._spawn_lock = Lock()
50 1
        self._thread = None
51
52
        # Triggers
53 1
        self._library_update = LibraryUpdateTrigger(self)
54
55 1
    def queue(self, account, mode, data=None, media=SyncMedia.All, priority=10, trigger=SyncResult.Trigger.Manual, **kwargs):
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (125/120).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
56
        """Queue a sync for the provided account
57
58
        Note: if a sync is already queued for the provided account a `SyncError` will be raised.
59
60
        :param account: Account to synchronize with trakt
61
        :type account: int or plugin.models.Account
62
63
        :param mode: Syncing mode (pull, push, etc..)
64
        :type mode: int (plugin.sync.SyncMode)
65
66
        :param data: Data to synchronize (collection, ratings, etc..)
67
        :type data: int (plugin.sync.SyncData)
68
69
        :param media: Media to synchronize (movies, shows, etc..)
70
        :type media: int (plugin.sync.SyncMedia)
71
72
        :return: `SyncResult` object with details on the sync outcome.
73
        :rtype: plugin.sync.core.result.SyncResult
74
        """
75
        if InterfaceMessages.critical:
0 ignored issues
show
Bug introduced by
The Class InterfaceMessages does not seem to have a member named critical.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
76
            raise QueueError('Error', InterfaceMessages.message)
0 ignored issues
show
Bug introduced by
The Class InterfaceMessages does not seem to have a member named message.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
77
78
        if TZ_LOCAL is None:
79
            raise QueueError('Error', 'Unable to retrieve system timezone')
80
81
        try:
82
            # Create new task
83
            task = SyncTask.create(account, mode, data, media, trigger, **kwargs)
84
        except QueueError:
85
            exc_info = sys.exc_info()
86
87
            # Re-raise queue errors
88
            raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
89
        except Exception as ex:
90
            log.warn('Unable to construct task: %s', ex, exc_info=True)
91
92
            # Raise as queue error
93
            raise QueueError('Error', 'Unable to construct task: %s' % ex)
94
95
        with self._queue_lock:
96
            # Ensure we only have one task queued per account
97
            account_tasks = [
98
                t for (p, a, t) in self._queue.queue
99
                if (
100
                    a == task.account.id and
101
                    t.result and
102
                    (trigger != SyncResult.Trigger.Manual or t.result.trigger == trigger)
103
                )
104
            ]
105
106
            if len(account_tasks):
107
                raise QueueError("Unable to queue sync", "Sync has already been queued for this account")
108
109
            # Queue task until the thread is available
110
            self._queue.put((priority, task.account.id, task), block=False)
111
112
            # Ensure thread is active
113
            self.spawn()
114
115
        # Wait for task start
116
        for x in xrange(10):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
Unused Code introduced by
The variable x seems to be unused.
Loading history...
117
            if task.started:
118
                log.debug('Task %r has started', task)
119
                return
120
121
            time.sleep(1)
122
123
        raise QueueError("Sync queued", "Sync will start once the currently queued tasks have finished")
124
125 1
    def spawn(self):
126
        """Ensure syncing thread has been spawned"""
127
        with self._spawn_lock:
128
            if self._thread is not None:
129
                return
130
131
            self._thread = Thread(target=self.run_wrapper)
132
            self._thread.start()
133
134
            log.debug('Spawned syncing thread: %r', self._thread)
135
136 1
    def run_wrapper(self):
137
        while True:
138
            try:
139
                # Retrieve task from queue
140
                try:
141
                    priority, account_id, task = self._queue.get(timeout=30)
142
                except Queue.Empty:
143
                    continue
144
145
                # Check if we should defer this task
146
                if self.should_defer(task):
147
                    # Re-queue sync task
148
                    if priority < 10000:
149
                        priority += 1
150
151
                    self._queue.put((priority, account_id, task), block=False)
152
153
                    # Wait 10 seconds
154
                    time.sleep(10)
155
                    continue
156
157
                # Select task
158
                self.current = task
159
            except Exception as ex:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
160
                log.warn('Exception raised while attempting to retrieve sync task from queue: %s', ex, exc_info=True)
161
162
                time.sleep(30)
163
                continue
164
165
            # Start task
166
            try:
167
                log.info('(%r) Started', self.current.mode)
168
                self.current.started = True
169
170
                # Construct modes/handlers for task
171
                self.current.construct(HANDLERS, MODES)
172
173
                # Run in plex authorization context
174
                with self.current.account.plex.authorization():
175
                    # Run in trakt authorization context
176
                    with self.current.account.trakt.authorization():
177
                        # Run sync
178
                        self.run()
179
180
                self.current.success = True
181
            except Exception as ex:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
182
                log.warn('Exception raised in sync task: %s', ex, exc_info=True)
183
184
                self.current.exceptions.append(sys.exc_info())
185
                self.current.success = False
186
187
            try:
188
                # Sync task complete, run final tasks
189
                self.finish()
190
            except Exception as ex:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
191
                log.error('Exception raised while attempting to finish sync task: %s', ex, exc_info=True)
192
193 1
    def should_defer(self, task):
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
194
        if task and task.result:
195
            # Ignore sync conditions on manual triggers
196
            if task.result.trigger == SyncResult.Trigger.Manual:
197
                return False
198
199
            # Ignore sync conditions if the task has been queued for over 12 hours
200
            started_ago = datetime.utcnow() - task.result.started_at
201
202
            if started_ago > timedelta(hours=12):
203
                log.debug('Task has been queued for over 12 hours, ignoring sync conditions')
204
                return False
205
206
        if Preferences.get('sync.idle_defer'):
207
            # Defer sync tasks until server finishes streaming (and is idle for 30 minutes)
208
            if ModuleManager['sessions'].is_streaming():
209
                log.debug('Deferring sync task, server is currently streaming media')
210
                return True
211
212
            if not ModuleManager['sessions'].is_idle():
213
                log.debug(
214
                    'Deferring sync task, server has been streaming media recently (in the last %d minutes)',
215
                    Preferences.get('sync.idle_delay')
216
                )
217
                return True
218
219
        return False
220
221 1
    def finish(self):
222
        # Cleanup `current` task
223
        current = self.current
224
        current.finish()
225
226
        # Task finished
227
        log.info('(%r) Done', current.mode)
228
229
        # Cleanup sync manager
230
        self.current = None
231
232 1
    def cancel(self, id):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
233
        """Trigger a currently running sync to abort
234
235
        Note: A sync will only cancel at the next "safe" cancel point, this will not
236
        force a thread to end immediately.
237
238
        :return: `True` if a sync has been triggered to cancel,
239
                 `False` if there was no sync to cancel.
240
        :rtype: bool
241
        """
242
        current = self.current
243
244
        if current is None:
245
            # No active sync task
246
            return True
247
248
        if current.id != id:
249
            # Active task doesn't match `id`
250
            return False
251
252
        # Request task abort
253
        current.abort(timeout=10)
254
255
        log.info('(%r) Abort', current.mode)
256
        return True
257
258 1
    def run(self):
259
        # Trigger sync methods
260
        self._trigger([
261
            'construct',
262
            'start',
263
            'run',
264
            'finish',
265
            'stop'
266
        ])
267
268 1
    def _trigger(self, names):
269
        if self.current.mode not in self.current.modes:
270
            log.warn('Unknown sync mode: %r', self.current.mode)
271
            return
272
273
        mode = self.current.modes[self.current.mode]
274
275
        for name in names:
276
            func = getattr(mode, name, None)
277
278
            if not func:
279
                log.warn('Unknown method: %r', name)
280
                return
281
282
            func()
283
284
285
Sync = Main()
286