Passed
Pull Request — main (#228)
by
unknown
02:10
created

pincer.opus.Decoder._set_gain()   A

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
"""
2
This is a modified version of discord.py's Opus module
3
Copyright (c) 2015-2021 Rapptz
4
Copyright (c) 2021-present Pincer
5
:license: MIT, see LICENSE for details
6
"""
7
8
from __future__ import annotations
9
10
from typing import (
0 ignored issues
show
Bug introduced by
The name TypedDict does not seem to exist in module typing.
Loading history...
Bug introduced by
The name Literal does not seem to exist in module typing.
Loading history...
11
    List,
12
    Tuple,
13
    TypedDict,
14
    Any,
15
    TYPE_CHECKING,
16
    Callable,
17
    TypeVar,
18
    Literal,
19
    Optional,
20
    overload,
21
)
22
23
import array
24
import ctypes
25
import ctypes.util
26
import logging
27
import math
28
import os.path
29
import struct
30
import sys
31
32
from .exceptions import PincerError, InvalidArgument
33
34
if TYPE_CHECKING:
35
    T = TypeVar("T")
36
    BAND_CTL = Literal["narrow", "medium", "wide", "superwide", "full"]
37
    SIGNAL_CTL = Literal["auto", "voice", "music"]
38
39
40
class BandCtl(TypedDict):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
41
    narrow: int
42
    medium: int
43
    wide: int
44
    superwide: int
45
    full: int
46
47
48
class SignalCtl(TypedDict):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
49
    auto: int
50
    voice: int
51
    music: int
52
53
54
__all__ = (
55
    "Encoder",
56
    "OpusError",
57
    "OpusNotLoaded",
58
)
59
60
_log = logging.getLogger(__name__)
61
62
c_int_ptr = ctypes.POINTER(ctypes.c_int)
63
c_int16_ptr = ctypes.POINTER(ctypes.c_int16)
64
c_float_ptr = ctypes.POINTER(ctypes.c_float)
65
66
_lib = None
67
68
69
class EncoderStruct(ctypes.Structure):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
70
    pass
71
72
73
class DecoderStruct(ctypes.Structure):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
74
    pass
75
76
77
EncoderStructPtr = ctypes.POINTER(EncoderStruct)
78
DecoderStructPtr = ctypes.POINTER(DecoderStruct)
79
80
## Some constants from opus_defines.h
81
# Error codes
82
OK = 0
83
BAD_ARG = -1
84
85
# Encoder CTLs
86
APPLICATION_AUDIO = 2049
87
APPLICATION_VOIP = 2048
88
APPLICATION_LOWDELAY = 2051
89
90
CTL_SET_BITRATE = 4002
91
CTL_SET_BANDWIDTH = 4008
92
CTL_SET_FEC = 4012
93
CTL_SET_PLP = 4014
94
CTL_SET_SIGNAL = 4024
95
96
# Decoder CTLs
97
CTL_SET_GAIN = 4034
98
CTL_LAST_PACKET_DURATION = 4039
99
100
band_ctl: BandCtl = {
101
    "narrow": 1101,
102
    "medium": 1102,
103
    "wide": 1103,
104
    "superwide": 1104,
105
    "full": 1105,
106
}
107
108
signal_ctl: SignalCtl = {
109
    "auto": -1000,
110
    "voice": 3001,
111
    "music": 3002,
112
}
113
114
115
def _err_lt(result: int, func: Callable, args: List) -> int:
0 ignored issues
show
Unused Code introduced by
The argument args seems to be unused.
Loading history...
116
    if result < OK:
117
        _log.info("error has happened in %s", func.__name__)
118
        raise OpusError(result)
119
    return result
120
121
122
def _err_ne(result: T, func: Callable, args: List) -> T:
123
    ret = args[-1]._obj
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _obj was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
124
    if ret.value != OK:
125
        _log.info("error has happened in %s", func.__name__)
126
        raise OpusError(ret.value)
127
    return result
128
129
130
# A list of exported functions.
131
# The first argument is obviously the name.
132
# The second one are the types of arguments it takes.
133
# The third is the result type.
134
# The fourth is the error handler.
135
exported_functions: List[Tuple[Any, ...]] = [
136
    # Generic
137
    ("opus_get_version_string", None, ctypes.c_char_p, None),
138
    ("opus_strerror", [ctypes.c_int], ctypes.c_char_p, None),
139
    # Encoder functions
140
    ("opus_encoder_get_size", [ctypes.c_int], ctypes.c_int, None),
141
    (
142
        "opus_encoder_create",
143
        [ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr],
144
        EncoderStructPtr,
145
        _err_ne,
146
    ),
147
    (
148
        "opus_encode",
149
        [
150
            EncoderStructPtr,
151
            c_int16_ptr,
152
            ctypes.c_int,
153
            ctypes.c_char_p,
154
            ctypes.c_int32,
155
        ],
156
        ctypes.c_int32,
157
        _err_lt,
158
    ),
159
    (
160
        "opus_encode_float",
161
        [
162
            EncoderStructPtr,
163
            c_float_ptr,
164
            ctypes.c_int,
165
            ctypes.c_char_p,
166
            ctypes.c_int32,
167
        ],
168
        ctypes.c_int32,
169
        _err_lt,
170
    ),
171
    ("opus_encoder_ctl", None, ctypes.c_int32, _err_lt),
172
    ("opus_encoder_destroy", [EncoderStructPtr], None, None),
173
    # Decoder functions
174
    ("opus_decoder_get_size", [ctypes.c_int], ctypes.c_int, None),
175
    (
176
        "opus_decoder_create",
177
        [ctypes.c_int, ctypes.c_int, c_int_ptr],
178
        DecoderStructPtr,
179
        _err_ne,
180
    ),
181
    (
182
        "opus_decode",
183
        [
184
            DecoderStructPtr,
185
            ctypes.c_char_p,
186
            ctypes.c_int32,
187
            c_int16_ptr,
188
            ctypes.c_int,
189
            ctypes.c_int,
190
        ],
191
        ctypes.c_int,
192
        _err_lt,
193
    ),
194
    (
195
        "opus_decode_float",
196
        [
197
            DecoderStructPtr,
198
            ctypes.c_char_p,
199
            ctypes.c_int32,
200
            c_float_ptr,
201
            ctypes.c_int,
202
            ctypes.c_int,
203
        ],
204
        ctypes.c_int,
205
        _err_lt,
206
    ),
207
    ("opus_decoder_ctl", None, ctypes.c_int32, _err_lt),
208
    ("opus_decoder_destroy", [DecoderStructPtr], None, None),
209
    (
210
        "opus_decoder_get_nb_samples",
211
        [DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32],
212
        ctypes.c_int,
213
        _err_lt,
214
    ),
215
    # Packet functions
216
    ("opus_packet_get_bandwidth", [ctypes.c_char_p], ctypes.c_int, _err_lt),
217
    ("opus_packet_get_nb_channels", [ctypes.c_char_p], ctypes.c_int, _err_lt),
218
    (
219
        "opus_packet_get_nb_frames",
220
        [ctypes.c_char_p, ctypes.c_int],
221
        ctypes.c_int,
222
        _err_lt,
223
    ),
224
    (
225
        "opus_packet_get_samples_per_frame",
226
        [ctypes.c_char_p, ctypes.c_int],
227
        ctypes.c_int,
228
        _err_lt,
229
    ),
230
]
231
232
233
def libopus_loader(name: str) -> Any:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
234
    # create the library...
235
    lib = ctypes.cdll.LoadLibrary(name)
236
237
    # register the functions...
238
    for item in exported_functions:
239
        func = getattr(lib, item[0])
240
241
        try:
242
            if item[1]:
243
                func.argtypes = item[1]
244
245
            func.restype = item[2]
246
        except KeyError:
247
            pass
248
249
        try:
250
            if item[3]:
251
                func.errcheck = item[3]
252
        except KeyError:
253
            _log.exception("Error assigning check function to %s", func)
254
255
    return lib
256
257
258
def _load_default() -> bool:
259
    global _lib
0 ignored issues
show
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
260
    try:
261
        if sys.platform == "win32":
262
            _basedir = os.path.dirname(os.path.abspath(__file__))
263
            _bitness = struct.calcsize("P") * 8
264
            _target = "x64" if _bitness > 32 else "x86"
265
            _filename = os.path.join(
266
                _basedir, "bin", f"libopus-0.{_target}.dll"
267
            )
268
            _lib = libopus_loader(_filename)
269
        else:
270
            _lib = libopus_loader(ctypes.util.find_library("opus"))
271
    except Exception:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
272
        _lib = None
273
274
    return _lib is not None
275
276
277
def load_opus(name: str) -> None:
278
    """Loads the libopus shared library for use with voice.
279
    If this function is not called then the library uses the function
280
    :func:`ctypes.util.find_library` and then loads that one if available.
281
    Not loading a library and attempting to use PCM based AudioSources will
282
    lead to voice not working.
283
    This function propagates the exceptions thrown.
284
    .. warning::
285
        The bitness of the library must match the bitness of your python
286
        interpreter. If the library is 64-bit then your python interpreter
287
        must be 64-bit as well. Usually if there's a mismatch in bitness then
288
        the load will throw an exception.
289
    .. note::
290
        On Windows, this function should not need to be called as the binaries
291
        are automatically loaded.
292
    .. note::
293
        On Windows, the .dll extension is not necessary. However, on Linux
294
        the full extension is required to load the library, e.g. ``libopus.so.1``.
295
        On Linux however, :func:`ctypes.util.find_library` will usually find the library automatically
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (102/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
296
        without you having to call this.
297
    Parameters
298
    ----------
299
    name: :class:`str`
300
        The filename of the shared library.
301
    """
302
    global _lib
0 ignored issues
show
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
303
    _lib = libopus_loader(name)
304
305
306
def is_loaded() -> bool:
307
    """Function to check if opus lib is successfully loaded either
308
    via the :func:`ctypes.util.find_library` call of :func:`load_opus`.
309
    This must return ``True`` for voice to work.
310
    Returns
311
    -------
312
    :class:`bool`
313
        Indicates if the opus library has been loaded.
314
    """
315
    global _lib
0 ignored issues
show
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
316
    return _lib is not None
317
318
319
class OpusError(PincerError):
320
    """An exception that is thrown for libopus related errors.
321
    Attributes
322
    ----------
323
    code: :class:`int`
324
        The error code returned.
325
    """
326
327
    def __init__(self, code: int):
328
        self.code: int = code
329
        msg = _lib.opus_strerror(self.code).decode("utf-8")
330
        _log.info('"%s" has happened', msg)
331
        super().__init__(msg)
332
333
334
class OpusNotLoaded(PincerError):
335
    """An exception that is thrown for when libopus is not loaded."""
336
337
    pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
338
339
340
class _OpusStruct:
341
    SAMPLING_RATE = 48000
342
    CHANNELS = 2
343
    FRAME_LENGTH = 20  # in milliseconds
344
    SAMPLE_SIZE = struct.calcsize("h") * CHANNELS
345
    SAMPLES_PER_FRAME = int(SAMPLING_RATE / 1000 * FRAME_LENGTH)
346
347
    FRAME_SIZE = SAMPLES_PER_FRAME * SAMPLE_SIZE
348
349
    @staticmethod
350
    def get_opus_version() -> str:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
351
        if not is_loaded() and not _load_default():
352
            raise OpusNotLoaded()
353
354
        return _lib.opus_get_version_string().decode("utf-8")
355
356
357
class Encoder(_OpusStruct):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
358
    def __init__(self, application: int = APPLICATION_AUDIO):
359
        _OpusStruct.get_opus_version()
360
361
        self.application: int = application
362
        self._state: EncoderStruct = self._create_state()
363
        self.set_bitrate(128)
364
        self.set_fec(True)
365
        self.set_expected_packet_loss_percent(0.15)
366
        self.set_bandwidth("full")
367
        self.set_signal_type("auto")
368
369
    def __del__(self) -> None:
370
        if hasattr(self, "_state"):
371
            _lib.opus_encoder_destroy(self._state)
372
            # This is a destructor, so it's okay to assign None
373
            self._state = None  # type: ignore
374
375
    def _create_state(self) -> EncoderStruct:
376
        ret = ctypes.c_int()
377
        return _lib.opus_encoder_create(
378
            self.SAMPLING_RATE,
379
            self.CHANNELS,
380
            self.application,
381
            ctypes.byref(ret),
382
        )
383
384
    def set_bitrate(self, kbps: int) -> int:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
385
        kbps = min(512, max(16, int(kbps)))
386
387
        _lib.opus_encoder_ctl(self._state, CTL_SET_BITRATE, kbps * 1024)
388
        return kbps
389
390
    def set_bandwidth(self, req: BAND_CTL) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
391
        if req not in band_ctl:
392
            raise KeyError(
393
                f'{req!r} is not a valid bandwidth setting. Try one of: {",".join(band_ctl)}'
394
            )
395
396
        k = band_ctl[req]
397
        _lib.opus_encoder_ctl(self._state, CTL_SET_BANDWIDTH, k)
398
399
    def set_signal_type(self, req: SIGNAL_CTL) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
400
        if req not in signal_ctl:
401
            raise KeyError(
402
                f'{req!r} is not a valid bandwidth setting. Try one of: {",".join(signal_ctl)}'
403
            )
404
405
        k = signal_ctl[req]
406
        _lib.opus_encoder_ctl(self._state, CTL_SET_SIGNAL, k)
407
408
    def set_fec(self, enabled: bool = True) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
409
        _lib.opus_encoder_ctl(self._state, CTL_SET_FEC, 1 if enabled else 0)
410
411
    def set_expected_packet_loss_percent(self, percentage: float) -> None:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
412
        _lib.opus_encoder_ctl(
413
            self._state, CTL_SET_PLP, min(100, max(0, int(percentage * 100)))
414
        )  # type: ignore
415
416
    def encode(self, pcm: bytes, frame_size: int) -> bytes:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
417
        max_data_bytes = len(pcm)
418
        # bytes can be used to reference pointer
419
        pcm_ptr = ctypes.cast(pcm, c_int16_ptr)  # type: ignore
420
        data = (ctypes.c_char * max_data_bytes)()
421
422
        ret = _lib.opus_encode(
423
            self._state, pcm_ptr, frame_size, data, max_data_bytes
424
        )
425
426
        # array can be initialized with bytes but mypy doesn't know
427
        return array.array("b", data[:ret]).tobytes()  # type: ignore
428
429
430
class Decoder(_OpusStruct):
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
431
    def __init__(self):
432
        _OpusStruct.get_opus_version()
433
434
        self._state: DecoderStruct = self._create_state()
435
436
    def __del__(self) -> None:
437
        if hasattr(self, "_state"):
438
            _lib.opus_decoder_destroy(self._state)
439
            # This is a destructor, so it's okay to assign None
440
            self._state = None  # type: ignore
441
442
    def _create_state(self) -> DecoderStruct:
443
        ret = ctypes.c_int()
444
        return _lib.opus_decoder_create(
445
            self.SAMPLING_RATE, self.CHANNELS, ctypes.byref(ret)
446
        )
447
448
    @staticmethod
449
    def packet_get_nb_frames(data: bytes) -> int:
450
        """Gets the number of frames in an Opus packet"""
451
        return _lib.opus_packet_get_nb_frames(data, len(data))
452
453
    @staticmethod
454
    def packet_get_nb_channels(data: bytes) -> int:
455
        """Gets the number of channels in an Opus packet"""
456
        return _lib.opus_packet_get_nb_channels(data)
457
458
    @classmethod
459
    def packet_get_samples_per_frame(cls, data: bytes) -> int:
460
        """Gets the number of samples per frame from an Opus packet"""
461
        return _lib.opus_packet_get_samples_per_frame(data, cls.SAMPLING_RATE)
462
463
    def _set_gain(self, adjustment: int) -> int:
464
        """Configures decoder gain adjustment.
465
        Scales the decoded output by a factor specified in Q8 dB units.
466
        This has a maximum range of -32768 to 32767 inclusive, and returns
467
        OPUS_BAD_ARG (-1) otherwise. The default is zero indicating no adjustment.
468
        This setting survives decoder reset (irrelevant for now).
469
        gain = 10**x/(20.0*256)
470
        (from opus_defines.h)
471
        """
472
        return _lib.opus_decoder_ctl(self._state, CTL_SET_GAIN, adjustment)
473
474
    def set_gain(self, dB: float) -> int:
475
        """Sets the decoder gain in dB, from -128 to 128."""
476
477
        dB_Q8 = max(
478
            -32768, min(32767, round(dB * 256))
479
        )  # dB * 2^n where n is 8 (Q8)
480
        return self._set_gain(dB_Q8)
481
482
    def set_volume(self, mult: float) -> int:
483
        """Sets the output volume as a float percent, i.e. 0.5 for 50%, 1.75 for 175%, etc."""
484
        return self.set_gain(20 * math.log10(mult))  # amplitude ratio
485
486
    def _get_last_packet_duration(self) -> int:
487
        """Gets the duration (in samples) of the last packet successfully decoded or concealed."""
488
489
        ret = ctypes.c_int32()
490
        _lib.opus_decoder_ctl(
491
            self._state, CTL_LAST_PACKET_DURATION, ctypes.byref(ret)
492
        )
493
        return ret.value
494
495
    @overload
496
    def decode(self, data: bytes, *, fec: bool) -> bytes:
497
        ...
498
499
    @overload
500
    def decode(self, data: Literal[None], *, fec: Literal[False]) -> bytes:
501
        ...
502
503
    def decode(self, data: Optional[bytes], *, fec: bool = False) -> bytes:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
504
        if data is None and fec:
505
            raise InvalidArgument(
506
                "Invalid arguments: FEC cannot be used with null data"
507
            )
508
509
        if data is None:
510
            frame_size = (
511
                self._get_last_packet_duration() or self.SAMPLES_PER_FRAME
512
            )
513
            channel_count = self.CHANNELS
514
        else:
515
            frames = self.packet_get_nb_frames(data)
516
            channel_count = self.packet_get_nb_channels(data)
517
            samples_per_frame = self.packet_get_samples_per_frame(data)
518
            frame_size = frames * samples_per_frame
519
520
        pcm = (ctypes.c_int16 * (frame_size * channel_count))()
521
        pcm_ptr = ctypes.cast(pcm, c_int16_ptr)
522
523
        ret = _lib.opus_decode(
524
            self._state,
525
            data,
526
            len(data) if data else 0,
527
            pcm_ptr,
528
            frame_size,
529
            fec,
530
        )
531
532
        return array.array("h", pcm[: ret * channel_count]).tobytes()
533