PLSFileParser.__init__()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
dl 0
loc 6
rs 9.4285
c 1
b 0
f 0
1
2
import sys
3
import codecs
4
import os.path
5
import warnings
6
7
from werkzeug.utils import cached_property
8
9
from browsepy.compat import range, PY_LEGACY  # noqa
10
from browsepy.file import Node, File, Directory, \
11
                          underscore_replace, check_under_base
12
13
14
if PY_LEGACY:
15
    import ConfigParser as configparser
16
else:
17
    import configparser
18
19
ConfigParserBase = (
20
    configparser.SafeConfigParser
21
    if hasattr(configparser, 'SafeConfigParser') else
22
    configparser.ConfigParser
23
    )
24
25
26
class PLSFileParser(object):
27
    '''
28
    ConfigParser wrapper accepting fallback on get for convenience.
29
30
    This wraps instead of inheriting due ConfigParse being classobj on python2.
31
    '''
32
    NOT_SET = type('NotSetType', (object,), {})
33
    parser_class = (
34
        configparser.SafeConfigParser
35
        if hasattr(configparser, 'SafeConfigParser') else
36
        configparser.ConfigParser
37
        )
38
39
    def __init__(self, path):
40
        with warnings.catch_warnings():
41
            # We already know about SafeConfigParser deprecation!
42
            warnings.filterwarnings('ignore', category=DeprecationWarning)
43
            self._parser = self.parser_class()
44
        self._parser.read(path)
45
46
    def getint(self, section, key, fallback=NOT_SET):
47
        try:
48
            return self._parser.getint(section, key)
49
        except (configparser.NoOptionError, ValueError):
50
            if fallback is self.NOT_SET:
51
                raise
52
            return fallback
53
54
    def get(self, section, key, fallback=NOT_SET):
55
        try:
56
            return self._parser.get(section, key)
57
        except (configparser.NoOptionError, ValueError):
58
            if fallback is self.NOT_SET:
59
                raise
60
            return fallback
61
62
63
class PlayableBase(File):
64
    extensions = {
65
        'mp3': 'audio/mpeg',
66
        'ogg': 'audio/ogg',
67
        'wav': 'audio/wav',
68
        'm3u': 'audio/x-mpegurl',
69
        'm3u8': 'audio/x-mpegurl',
70
        'pls': 'audio/x-scpls',
71
    }
72
73
    @classmethod
74
    def extensions_from_mimetypes(cls, mimetypes):
75
        mimetypes = frozenset(mimetypes)
76
        return {
77
            ext: mimetype
78
            for ext, mimetype in cls.extensions.items()
79
            if mimetype in mimetypes
80
        }
81
82
    @classmethod
83
    def detect(cls, node, os_sep=os.sep):
84
        basename = node.path.rsplit(os_sep)[-1]
85
        if '.' in basename:
86
            ext = basename.rsplit('.')[-1]
87
            return cls.extensions.get(ext, None)
88
        return None
89
90
91
class PlayableFile(PlayableBase):
92
    mimetypes = ['audio/mpeg', 'audio/ogg', 'audio/wav']
93
    extensions = PlayableBase.extensions_from_mimetypes(mimetypes)
94
    media_map = {mime: ext for ext, mime in extensions.items()}
95
96
    def __init__(self, **kwargs):
97
        self.duration = kwargs.pop('duration', None)
98
        self.title = kwargs.pop('title', None)
99
        super(PlayableFile, self).__init__(**kwargs)
100
101
    @property
102
    def title(self):
103
        return self._title or self.name
104
105
    @title.setter
106
    def title(self, title):
107
        self._title = title
108
109
    @property
110
    def media_format(self):
111
        return self.media_map[self.type]
112
113
114
class PlayListFile(PlayableBase):
115
    playable_class = PlayableFile
116
    mimetypes = ['audio/x-mpegurl', 'audio/x-mpegurl', 'audio/x-scpls']
117
    extensions = PlayableBase.extensions_from_mimetypes(mimetypes)
118
119
    @classmethod
120
    def from_urlpath(cls, path, app=None):
121
        original = Node.from_urlpath(path, app)
122
        if original.mimetype == PlayableDirectory.mimetype:
123
            return PlayableDirectory(original.path, original.app)
124
        elif original.mimetype == M3UFile.mimetype:
125
            return M3UFile(original.path, original.app)
126
        if original.mimetype == PLSFile.mimetype:
127
            return PLSFile(original.path, original.app)
128
        return original
129
130
    def normalize_playable_path(self, path):
131
        if '://' in path:
132
            return path
133
        path = os.path.normpath(path)
134
        if not os.path.isabs(path):
135
            return os.path.join(self.parent.path, path)
136
        drive = os.path.splitdrive(self.path)[0]
137
        if drive and not os.path.splitdrive(path)[0]:
138
            path = drive + path
139
        if check_under_base(path, self.app.config['directory_base']):
140
            return path
141
        return None
142
143
    def _entries(self):
144
        return
145
        yield  # noqa
146
147
    def entries(self, sortkey=None, reverse=None):
148
        for file in self._entries():
149
            if PlayableFile.detect(file):
150
                yield file
151
152
153
class PLSFile(PlayListFile):
154
    ini_parser_class = PLSFileParser
155
    maxsize = getattr(sys, 'maxint', 0) or getattr(sys, 'maxsize', 0) or 2**32
156
    mimetype = 'audio/x-scpls'
157
    extensions = PlayableBase.extensions_from_mimetypes([mimetype])
158
159
    def _entries(self):
160
        parser = self.ini_parser_class(self.path)
161
        maxsize = parser.getint('playlist', 'NumberOfEntries', None)
162
        for i in range(1, self.maxsize if maxsize is None else maxsize + 1):
163
            path = parser.get('playlist', 'File%d' % i, None)
164
            if not path:
165
                if maxsize:
166
                    continue
167
                break
168
            path = self.normalize_playable_path(path)
169
            if not path:
170
                continue
171
            yield self.playable_class(
172
                path=path,
173
                app=self.app,
174
                duration=parser.getint(
175
                    'playlist', 'Length%d' % i,
176
                    None
177
                    ),
178
                title=parser.get(
179
                    'playlist',
180
                    'Title%d' % i,
181
                    None
182
                    ),
183
                )
184
185
186
class M3UFile(PlayListFile):
187
    mimetype = 'audio/x-mpegurl'
188
    extensions = PlayableBase.extensions_from_mimetypes([mimetype])
189
190
    def _iter_lines(self):
191
        prefix = '#EXTM3U\n'
192
        encoding = 'utf-8' if self.path.endswith('.m3u8') else 'ascii'
193
        with codecs.open(
194
          self.path, 'r',
195
          encoding=encoding,
196
          errors=underscore_replace
197
          ) as f:
198
            if f.read(len(prefix)) != prefix:
199
                f.seek(0)
200
            for line in f:
201
                line = line.rstrip()
202
                if line:
203
                    yield line
204
205
    def _entries(self):
206
        data = {}
207
        for line in self._iter_lines():
208
            if line.startswith('#EXTINF:'):
209
                duration, title = line.split(',', 1)
210
                data['duration'] = None if duration == '-1' else int(duration)
211
                data['title'] = title
212
            if not line:
213
                continue
214
            path = self.normalize_playable_path(line)
215
            if path:
216
                yield self.playable_class(path=path, app=self.app, **data)
217
            data.clear()
218
219
220
class PlayableDirectory(Directory):
221
    file_class = PlayableFile
222
    name = ''
223
224
    @cached_property
225
    def parent(self):
226
        return Directory(self.path)
227
228
    @classmethod
229
    def detect(cls, node):
230
        if node.is_directory:
231
            for file in node._listdir():
232
                if PlayableFile.detect(file):
233
                    return cls.mimetype
234
        return None
235
236
    def entries(self, sortkey=None, reverse=None):
237
        listdir_fnc = super(PlayableDirectory, self).listdir
238
        for file in listdir_fnc(sortkey=sortkey, reverse=reverse):
239
            if PlayableFile.detect(file):
240
                yield file
241
242
243
def detect_playable_mimetype(path, os_sep=os.sep):
244
    basename = path.rsplit(os_sep)[-1]
245
    if '.' in basename:
246
        ext = basename.rsplit('.')[-1]
247
        return PlayableBase.extensions.get(ext, None)
248
    return None
249