1
|
|
|
""" |
2
|
|
|
This is a modified version of discord.py's OggParse module |
3
|
|
|
Copyright (c) 2015-2021 Rapptz |
4
|
|
|
Copyright (c) 2021-present Pincer |
5
|
|
|
:license: MIT, see LICENSE for details |
6
|
|
|
""" |
7
|
|
|
|
8
|
|
|
from __future__ import annotations |
9
|
|
|
|
10
|
|
|
import struct |
11
|
|
|
|
12
|
|
|
from typing import TYPE_CHECKING, ClassVar, IO, Generator, Tuple, Optional |
13
|
|
|
|
14
|
|
|
from .exceptions import PincerError |
15
|
|
|
|
16
|
|
|
__all__ = ( |
17
|
|
|
'OggError', |
18
|
|
|
'OggPage', |
19
|
|
|
'OggStream', |
20
|
|
|
) |
21
|
|
|
|
22
|
|
|
class OggError(PincerError): |
23
|
|
|
"""An exception that is thrown for Ogg stream parsing errors.""" |
24
|
|
|
pass |
|
|
|
|
25
|
|
|
|
26
|
|
|
# https://tools.ietf.org/html/rfc3533 |
27
|
|
|
# https://tools.ietf.org/html/rfc7845 |
28
|
|
|
|
29
|
|
|
class OggPage: |
|
|
|
|
30
|
|
|
_header: ClassVar[struct.Struct] = struct.Struct('<xBQIIIB') |
31
|
|
|
if TYPE_CHECKING: |
32
|
|
|
flag: int |
33
|
|
|
gran_pos: int |
34
|
|
|
serial: int |
35
|
|
|
pagenum: int |
36
|
|
|
crc: int |
37
|
|
|
segnum: int |
38
|
|
|
|
39
|
|
|
def __init__(self, stream: IO[bytes]) -> None: |
40
|
|
|
try: |
41
|
|
|
header = stream.read(struct.calcsize(self._header.format)) |
42
|
|
|
|
43
|
|
|
self.flag, self.gran_pos, self.serial, \ |
44
|
|
|
self.pagenum, self.crc, self.segnum = self._header.unpack(header) |
45
|
|
|
|
46
|
|
|
self.segtable: bytes = stream.read(self.segnum) |
47
|
|
|
bodylen = sum(struct.unpack('B'*self.segnum, self.segtable)) |
48
|
|
|
self.data: bytes = stream.read(bodylen) |
49
|
|
|
except Exception: |
50
|
|
|
raise OggError('bad data stream') from None |
51
|
|
|
|
52
|
|
|
def iter_packets(self) -> Generator[Tuple[bytes, bool], None, None]: |
|
|
|
|
53
|
|
|
packetlen = offset = 0 |
54
|
|
|
partial = True |
55
|
|
|
|
56
|
|
|
for seg in self.segtable: |
57
|
|
|
if seg == 255: |
58
|
|
|
packetlen += 255 |
59
|
|
|
partial = True |
60
|
|
|
else: |
61
|
|
|
packetlen += seg |
62
|
|
|
yield self.data[offset:offset+packetlen], True |
63
|
|
|
offset += packetlen |
64
|
|
|
packetlen = 0 |
65
|
|
|
partial = False |
66
|
|
|
|
67
|
|
|
if partial: |
68
|
|
|
yield self.data[offset:], False |
69
|
|
|
|
70
|
|
|
class OggStream: |
|
|
|
|
71
|
|
|
def __init__(self, stream: IO[bytes]) -> None: |
72
|
|
|
self.stream: IO[bytes] = stream |
73
|
|
|
|
74
|
|
|
def _next_page(self) -> Optional[OggPage]: |
75
|
|
|
head = self.stream.read(4) |
76
|
|
|
if head == b'OggS': |
|
|
|
|
77
|
|
|
return OggPage(self.stream) |
78
|
|
|
elif not head: |
79
|
|
|
return None |
80
|
|
|
else: |
81
|
|
|
raise OggError('invalid header magic') |
82
|
|
|
|
83
|
|
|
def _iter_pages(self) -> Generator[OggPage, None, None]: |
84
|
|
|
page = self._next_page() |
85
|
|
|
while page: |
86
|
|
|
yield page |
87
|
|
|
page = self._next_page() |
88
|
|
|
|
89
|
|
|
def iter_packets(self) -> Generator[bytes, None, None]: |
|
|
|
|
90
|
|
|
partial = b'' |
91
|
|
|
for page in self._iter_pages(): |
92
|
|
|
for data, complete in page.iter_packets(): |
93
|
|
|
partial += data |
94
|
|
|
if complete: |
95
|
|
|
yield partial |
96
|
|
|
partial = b'' |
|
|
|
|