User._iter_friends()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 14

Duplication

Lines 14
Ratio 100 %

Code Coverage

Tests 12
CRAP Score 7

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
c 1
b 0
f 0
dl 14
loc 14
rs 7.3333
ccs 12
cts 12
cp 1
crap 7
1
"""Classes and functions to interact with users."""
2
3 1
import os
4 1
import socket
5 1
import getpass
6 1
import shutil
7 1
import logging
8
9 1
import yaml
0 ignored issues
show
Configuration introduced by
The import yaml could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
10
11 1
from dtb.song import Song
12
13
14 1
class User(object):
0 ignored issues
show
Unused Code introduced by
This abstract class does not seem to be used anywhere.
Loading history...
15
    """Represents a user directory."""
16
17 1
    PRIVATE = '.dtb'
18 1
    INFO = os.path.join(PRIVATE, 'info.yml')  # list of computer settings
19 1
    SETTINGS = os.path.join(PRIVATE, 'settings.yml')  # general preferences
20 1
    REQUESTS = os.path.join(PRIVATE, 'requests.yml')
21 1
    LIBRARY = os.path.join(PRIVATE, 'library.sqlite3')
22 1
    DROPS = os.path.join(PRIVATE, 'drops')
23
24 1
    def __init__(self, path, _check=True):
25 1
        self.path = path
26 1
        if _check:
27 1
            self.check()
28
29 1
    def __str__(self):
30 1
        return str(self.path)
31
32 1
    def __eq__(self, other):
33 1
        return self.path == other.path
34
35 1
    def __ne__(self, other):
36 1
        return self.path != other.path
37
38 1
    @staticmethod
39 1
    def new(root, name, downloads=None):
40
        """Create a new user in the share location.
41
42
        @param root: path to root of sharing directory
43
        @param name: name of user's sharing folder
44
        @param downloads: path to user's downloads directory
45
46
        @return: new User
47
        """
48 1
        logging.debug("creating user '{}'...".format(name))
49 1
        path = os.path.join(root, name)
50 1
        if os.path.exists(path):
51 1
            raise EnvironmentError("user already exists: {}".format(path))
52 1
        downloads = downloads or os.path.expanduser('~/Downloads')
53
54
        # Create a new user
55 1
        user = User(path, _check=False)
56
57
        # Create directories
58 1
        os.makedirs(user.path_private)
59 1
        os.makedirs(user.path_drops)
60
61
        # Create info
62 1
        info = get_info()
63 1
        data = [{'computer': info[0],
64
                 'username': info[1],
65
                 'downloads': downloads}]
66 1
        text = yaml.dump(data, default_flow_style=False)
67 1
        logging.debug("saving {}...".format(user.path_info))
68 1
        with open(user.path_info, 'w') as outfile:
69 1
            outfile.write(text)
70
71
        # Create settings
72 1
        text = yaml.dump({}, default_flow_style=False)
73 1
        logging.debug("saving {}...".format(user.path_settings))
74 1
        with open(user.path_settings, 'w') as outfile:
75 1
            outfile.write(text)
76
77
        # Create requests
78 1
        text = yaml.dump([], default_flow_style=False)
79 1
        logging.debug("saving {}...".format(user.path_requests))
80 1
        with open(user.path_requests, 'w') as outfile:
81 1
            outfile.write(text)
82
83
        # Create folders for friends
84 1
        for name in os.listdir(root):
85 1
            friendpath = os.path.join(root, name)
86 1
            if name != user.name and os.path.isdir(friendpath):
87 1
                User._makedir(os.path.join(user.path, name))
88 1
                User._makedir(os.path.join(friendpath, user.name))
89
90
        # Return the new user
91 1
        logging.info("created user: {}".format(user))
92 1
        user.check()
93 1
        return user
94
95 1
    @staticmethod
96 1
    def add(root, name, downloads=None):
97
        """Add the current computer's information to an existing user.
98
99
        @param root: path to root of sharing directory
100
        @param name: name of existing user's sharing folder
101
        @param downloads: path to user's downloads directory
102
103
        @return: existing User
104
        """
105 1
        logging.debug("adding to user '{}'...".format(name))
106 1
        downloads = downloads or os.path.expanduser('~/Downloads')
107
        # Get the existing user
108 1
        user = User(os.path.join(root, name))
109
        # Update info
110 1
        logging.debug("loading {}...".format(user.path_info))
111 1
        with open(user.path_info, 'r') as infile:
112 1
            text = infile.read()
113 1
        data = yaml.load(text)
114 1
        info = get_info()
115 1
        if not isinstance(data, list):
116 1
            logging.warning("data reset due to config format change")
117 1
            data = []
118 1
        data.append({'computer': info[0],
119
                     'username': info[1],
120
                     'downloads': downloads})
121 1
        text = yaml.dump(data, default_flow_style=False)
122 1
        logging.debug("saving {}...".format(user.path_info))
123 1
        with open(user.path_info, 'w') as outfile:
124 1
            outfile.write(text)
125
        # Return the updated user
126 1
        return user
127
128
    # properties based on path #################################################
129
130 1
    @property
131
    def name(self):
132
        """Get the name of the user's folder."""
133 1
        return os.path.split(self.path)[1]
134
135 1
    @property
136
    def root(self):
137
        """Get the path to root of the sharing directory."""
138 1
        return os.path.split(self.path)[0]
139
140 1
    @property
141
    def path_private(self):
142
        """Get the path to the user's private directory."""
143 1
        return os.path.join(self.path, User.PRIVATE)
144
145 1
    @property
146
    def path_drops(self):
147
        """Get the path to the user's drops directory."""
148 1
        return os.path.join(self.path, User.DROPS)
149
150 1
    @property
151
    def path_info(self):
152
        """Get the path to the user's information file."""
153 1
        return os.path.join(self.path, User.INFO)
154
155 1
    @property
156
    def path_library(self):
157
        """Get the path to the user's library file."""
158 1
        return os.path.join(self.path, User.LIBRARY)
159
160 1
    @property
161
    def path_requests(self):
162
        """Get the path to the user's requests file."""
163 1
        return os.path.join(self.path, User.REQUESTS)
164
165 1
    @property
166
    def path_settings(self):
167
        """Get the path to the user's requests file."""
168 1
        return os.path.join(self.path, User.SETTINGS)
169
170
    # properties based on files ################################################
171
172 1
    @property
173
    def info(self):
174
        """Get a list of the user's information."""
175 1
        infos = []
176 1
        logging.debug("loading {}...".format(self.path_info))
177 1
        with open(self.path_info, 'r') as infile:
178 1
            text = infile.read()
179 1
        data = yaml.load(text)
180 1
        if isinstance(data, list):
181 1
            for info in data:
182 1
                computer = info.get('computer', None)
183 1
                username = info.get('username', None)
184 1
                infos.append((computer, username))
185 1
        return infos
186
187 1
    @property
188
    def path_downloads(self):
189
        """Get the user's download path."""
190 1
        downloads = None
191 1
        info = get_info()
192 1
        logging.debug("loading {}...".format(self.path_info))
193 1
        with open(self.path_info, 'r') as infile:
194 1
            text = infile.read()
195 1
        data = yaml.load(text)
196 1
        if isinstance(data, list):
197 1
            for info2 in data:
198 1
                comp_user = info[:2]
199 1
                comp_user2 = (info2.get('computer', None),
200
                              info2.get('username', None))
201 1
                if comp_user == comp_user2:
202 1
                    downloads = info2.get('downloads', None)
203 1
                    break
204 1
        return downloads
205
206 1
    @path_downloads.setter
207
    def path_downloads(self, downloads):
208
        """Set the user's download path."""
209
        # TODO: refactor all into one common reader and writer
210 1
        info = get_info()
211 1
        logging.debug("loading {}...".format(self.path_info))
212 1
        with open(self.path_info, 'r') as infile:
213 1
            text = infile.read()
214 1
        data = yaml.load(text)
215 1
        if not isinstance(data, list):
216 1
            logging.warning("data reset due to config format change")
217 1
            data = []
218 1
        for index, info2 in enumerate(data):
219 1
            if (info[0] == info2.get('computer', None) and
220
                    info[1] == info2.get('username', None)):
221 1
                info2['downloads'] = downloads
222 1
                data[index] = info2
223 1
                break
224
        else:
225 1
            data.append({'computer': info[0],
226
                         'username': info[1],
227
                         'downloads': downloads})
228 1
        text = yaml.dump(data, default_flow_style=False)
229 1
        logging.debug("saving {}...".format(self.path_info))
230 1
        with open(self.path_info, 'w') as outfile:
231 1
            outfile.write(text)
232
233 1
    @property
234
    def friends(self):
235
        """Iterate through the user's friends."""
236 1
        for friend in self._iter_friends():
237 1
            yield friend
238
239 1 View Code Duplication
    def _iter_friends(self, clean=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
240
        """Iterate through the user's friends with optional cleanup."""
241 1
        for directory in os.listdir(self.root):
242 1
            path = os.path.join(self.root, directory)
243 1
            try:
244 1
                user = User(path)
245 1
            except ValueError as err:
246 1
                logging.debug("invalid user: {}".format(err))
247 1
                if clean and os.path.isdir(path):
248 1
                    logging.warning("deleting invalid user: {}".format(path))
249 1
                    self._delete(path)
250
            else:
251 1
                if user.name != self.name:
252 1
                    yield user
253
254 1
    @property
255
    def incoming(self):
256
        """Iterate through the list of incoming songs."""
257 1
        found = False
258 1
        logging.debug("looking for incoming songs ({})...".format(self.name))
259 1
        for friendname in os.listdir(self.path):
260 1
            friendpath = os.path.join(self.path, friendname)
261 1
            if friendname != User.PRIVATE and os.path.isdir(friendpath):
262 1
                for filename in os.listdir(friendpath):
263 1
                    filepath = os.path.join(friendpath, filename)
264 1
                    song = Song(filepath, self.path_downloads, friendname)
265 1
                    found = True
266 1
                    logging.debug("incoming: {}".format(song))
267 1
                    yield song
268 1
        if not found:
269 1
            logging.debug("no incoming songs ({})".format(self.name))
270
271 1
    @property
272
    def outgoing(self):
273
        """Iterate through the list of outgoing songs."""
274 1
        found = False
275 1
        logging.debug("looking for outgoing songs ({})...".format(self.name))
276 1
        for friend in self.friends:
277 1
            for song in friend.incoming:
278 1
                if song.friendname == self.name:
279 1
                    found = True
280
                    # TODO: is this the best way to invert ownership?
281 1
                    song.friendname = friend.name
282 1
                    logging.debug("outgoing: {}".format(song))
283 1
                    yield song
284 1
        if not found:
285 1
            logging.debug("no outgoing songs ({})".format(self.name))
286
287
    # methods ##################################################################
288
289 1
    def cleanup(self):
290
        """Delete invalid users, unlinked songs, and empty directories."""
291 1
        logging.info("cleaning up {}...".format(self.root))
292
        # Delete unlinked songs
293 1
        paths = [os.path.join(self.path_drops, name)
294
                 for name in os.listdir(self.path_drops)]
295 1
        for song in self.outgoing:
296 1
            try:
297 1
                paths.remove(song.source)
298 1
            except ValueError:
299 1
                pass
300 1
        for path in paths:
301 1
            logging.info("deleting unlinked: {}".format(path))
302 1
            self._delete(path)
303
        # Delete non-friend directories
304 1
        names = [friend.name for friend in self._iter_friends(clean=True)]
305 1
        for name in os.listdir(self.path):
306 1
            path = os.path.join(self.path, name)
307 1
            if name not in names and name != User.PRIVATE:
308 1
                logging.warning("deleting non-friend: {}".format(path))
309 1
                self._delete(path)
310
311 1
    @staticmethod
312
    def _makedir(path):
313
        """Create a directory if needed."""
314 1
        if not os.path.exists(path):
315 1
            os.makedirs(path)
316
317 1
    @staticmethod
318
    def _delete(path):
319
        """Delete a file or directory."""
320 1
        logging.debug("deleting {}...".format(path))
321 1
        if os.path.isdir(path):
322 1
            shutil.rmtree(path)
323
        else:
324 1
            os.remove(path)
325
326 1
    def recommend(self, path, users=None):
327
        """Recommend a song to a list of users.
328
329
        @param path: path to file
330
        @param users: names of users or None for all
331
332
        @return: shared Song
333
        """
334 1
        logging.info("recommending {}...".format(path))
335 1
        shutil.copy(path, self.path_drops)
336 1
        song = Song(os.path.join(self.path_drops, os.path.basename(path)))
337 1
        for friend in self.friends:
338 1
            if not users or friend.name in users:
339 1
                song.link(os.path.join(friend.path, self.name))
340 1
        return song
341
342 1
    def request(self, song):
343
        """Request a new song."""
344 1
        raise NotImplementedError("TODO: implement song requests")
345
346 1
    def check(self):
347
        """Verify the user's directory is valid."""
348 1
        if not os.path.isdir(self.path):
349 1
            raise ValueError("not a directory: {}".format(self.path))
350 1
        for path in (self.path_private, self.path_drops):
351 1
            if not os.path.isdir(path):
352 1
                raise ValueError("missing folder: {}".format(path))
353
        # TODO: also check self.path_library when library support is added
354 1
        for path in (self.path_info, self.path_requests, self.path_settings):
355 1
            if not os.path.isfile(path):
356 1
                raise ValueError("missing file: {}".format(path))
357
358 1
    def delete(self):
359
        """Delete the user."""
360 1
        for friend in self.friends:
361 1
            path = os.path.join(friend.path, self.name)
362 1
            if os.path.exists(path):
363 1
                logging.info("deleting {}...".format(path))
364 1
                shutil.rmtree(path)
365 1
        logging.info("deleting {}...".format(self.path))
366 1
        shutil.rmtree(self.path)
367
368
369 1
def get_info():
370
    """Return the current computer name and user name."""
371 1
    return socket.gethostname(), getpass.getuser()  # pylint: disable=no-member
372
373
374 1 View Code Duplication
def get_current(root):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
375
    """Get the current user based on this computer's information.
376
377
    @param root: path to root of sharing directory
378
379
    @return: current User
380
    """
381 1
    info = get_info()
382 1
    logging.debug("looking for {} in {}...".format(info, root))
383 1
    for directory in os.listdir(root):
384 1
        path = os.path.join(root, directory)
385 1
        try:
386 1
            user = User(path)
387 1
        except ValueError as err:
388 1
            logging.debug("invalid user: {}".format(err))
389
        else:
390 1
            if info in user.info:
391 1
                logging.info("found user: {}".format(user))
392 1
                return user
393
394
    raise EnvironmentError("{} not found in {}".format(info, root))
395