Base.format_pending()   F
last analyzed

Complexity

Conditions 10

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 95.7375

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 33
ccs 1
cts 20
cp 0.05
rs 3.1304
cc 10
crap 95.7375

How to fix   Complexity   

Complexity

Complex classes like Base.format_pending() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from plugin.core.environment import Environment
2 1
from plugin.sync.core.enums import SyncMode, SyncMedia
3 1
from plugin.sync.modes.core.base import Mode
4
5 1
import json
6 1
import logging
7 1
import os
8
9 1
log = logging.getLogger(__name__)
10
11
12 1
class Base(Mode):
0 ignored issues
show
Bug introduced by
The method run which was declared abstract in the super-class Mode
was not overridden.

Methods which raise NotImplementedError should be overridden in concrete child classes.

Loading history...
13 1
    mode = SyncMode.Push
14
15 1
    def __init__(self, task):
16
        super(Base, self).__init__(task)
17
18
    #
19
    # Execute handlers
20
    #
21
22 1
    def execute_movie(self, mo_id, pk, guid, p_item):
23
        # Execute handlers for each data type
24
        for data in self.get_data(SyncMedia.Movies):
25
            t_movie = self.trakt[(SyncMedia.Movies, data)].get(pk)
26
27
            self.execute_handlers(
28
                self.mode, SyncMedia.Movies, data,
29
30
                key=mo_id,
31
32
                guid=guid,
33
                p_item=p_item,
34
35
                t_item=t_movie
36
            )
37
38 1
    def execute_show(self, sh_id, pk, guid, p_show):
39
        for data in self.get_data(SyncMedia.Shows):
40
            t_show = self.trakt[(SyncMedia.Shows, data)].get(pk)
41
42
            # Execute show handlers
43
            self.execute_handlers(
44
                self.mode, SyncMedia.Shows, data,
45
                key=sh_id,
46
                guid=guid,
47
48
                p_item=p_show,
49
50
                t_item=t_show
51
            )
52
53 1
    def execute_episode(self, ep_id, pk, guid, identifier, p_show, p_season, p_episode):
0 ignored issues
show
Unused Code introduced by
The argument p_season seems to be unused.
Loading history...
54
        season_num, episode_num = identifier
55
56
        # Execute handlers for each data type
57
        for data in self.get_data(SyncMedia.Episodes):
58
            t_show, t_season, t_episode = self.t_objects(
0 ignored issues
show
Unused Code introduced by
The variable t_season seems to be unused.
Loading history...
59
                self.trakt[(SyncMedia.Episodes, data)], pk,
60
                season_num, episode_num
61
            )
62
63
            # Execute episode handlers
64
            self.execute_handlers(
65
                self.mode, SyncMedia.Episodes, data,
66
67
                key=ep_id,
68
                identifier=identifier,
69
70
                guid=guid,
71
                p_show=p_show,
72
                p_item=p_episode,
73
74
                t_show=t_show,
75
                t_item=t_episode
76
            )
77
78
    #
79
    # Helpers
80
    #
81
82 1
    @staticmethod
83
    def t_objects(collection, pk, season_num, episode_num):
84
        # Try find trakt `Show` from `collection`
85
        t_show = collection.get(pk)
86
87
        if t_show is None:
88
            return t_show, None, None
89
90
        # Try find trakt `Season`
91
        t_season = t_show.seasons.get(season_num)
92
93
        if t_season is None:
94
            return t_show, t_season, None
95
96
        # Try find trakt `Episode`
97
        t_episode = t_season.episodes.get(episode_num)
98
99
        return t_show, t_season, t_episode
100
101 1
    @classmethod
102
    def log_pending(cls, log, message, account, key, items):
0 ignored issues
show
Comprehensibility Bug introduced by
log is re-defining a name which is already available in the outer-scope (previously defined on line 9).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
103
        if type(items) is set:
104
            items = [
105
                (k, None)
106
                for k in items
107
            ]
108
        elif type(items) is dict:
109
            items = [
110
                (k, v)
111
                for k, v in items.items()
112
                if len(v) > 0
113
            ]
114
        else:
115
            raise ValueError('Unknown type for "pending" parameter')
116
117
        if len(items) < 1:
118
            return
119
120
        # Format items
121
        count, keys = cls.format_pending(items)
122
123
        # Update pending items report
124
        try:
125
            report_path = cls.write_pending(account, key, keys)
126
        except Exception as ex:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
127
            log.warn('Unable to save report: %s', ex, exc_info=True)
128
            report_path = None
129
130
        # Display message in log file
131
        log.info(message, count, os.path.relpath(report_path, Environment.path.home))
132
133 1
    @classmethod
134
    def write_pending(cls, account, key, keys):
135
        directory = os.path.join(Environment.path.plugin_data, 'Reports', 'Sync', str(account.id), 'Pending')
136
        path = os.path.join(directory, '%s.json' % key)
137
138
        # Ensure directory exists
139
        if not os.path.exists(directory):
140
            os.makedirs(directory)
141
142
        # Write items
143
        with open(path, 'w') as fp:
144
            json.dump(
145
                keys, fp,
146
                sort_keys=True,
147
                indent=4,
148
                separators=(',', ': ')
149
            )
150
151
        return path
152
153 1
    @classmethod
154
    def format_pending(cls, items):
155
        result = {}
156
        child_count = 0
157
158
        for key, children in items:
159
            key = '/'.join([str(k) for k in key])
160
161
            # Set show/movie
162
            result[key] = None
163
164
            if children is None:
165
                continue
166
167
            # Append keys of children
168
            result[key] = []
169
170
            for c_key in children:
171
                if type(c_key) is tuple and len(c_key) == 2:
172
                    c_key = 'S%02dE%02d' % c_key
173
                elif type(c_key) is tuple:
174
                    c_key = '/'.join([str(k) for k in c_key])
175
176
                result[key].append(c_key)
177
                child_count += 1
178
179
            # Sort children keys
180
            result[key] = sorted(result[key])
181
182
        if not child_count:
183
            return len(result), sorted(result.keys())
184
185
        return child_count, result
186