Passed
Push — master ( 798fa3...b3fabc )
by Amin
59s
created

ffmpeg_streaming._media   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 237
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 124
dl 0
loc 237
rs 8.96
c 0
b 0
f 0
wmc 43

24 Methods

Rating   Name   Duplication   Size   Complexity  
A Save.__init__() 0 14 1
A Save.set_up() 0 3 1
A Save.finish_up() 0 8 3
A Save.__getattr__() 0 8 2
A HLS.encryption() 0 13 3
A Streaming.__init__() 0 6 1
A Streaming.auto_generate_representations() 0 6 1
A Streaming.add_filter() 0 7 2
C Save.output() 0 23 9
A Streaming.representations() 0 2 1
A HLS.set_up() 0 6 2
A Save._run() 0 6 2
A Streaming.watermarking() 0 6 1
A HLS.fragmented_mp4() 0 5 1
A DASH.set_up() 0 2 1
A DASH.generate_hls_playlist() 0 2 1
A Media.__init__() 0 9 1
A HLS.flags() 0 7 2
A HLS.finish_up() 0 8 2
A Media.hls() 0 5 1
A Media.dash() 0 5 1
A HLS.save_master_playlist() 0 8 2
A Stream2File.set_up() 0 2 1
A Media.stream2file() 0 5 1

How to fix   Complexity   

Complexity

Complex classes like ffmpeg_streaming._media often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
ffmpeg_streaming.media
3
~~~~~~~~~~~~
4
5
Media object to build a stream objects
6
7
8
:copyright: (c) 2020 by Amin Yazdanpanah.
9
:website: https://www.aminyazdanpanah.com
10
:email: [email protected]
11
:license: MIT, see LICENSE for more details.
12
"""
13
import abc
14
import os
15
import shutil
16
import tempfile
17
import atexit
18
import asyncio
19
20
from ._clouds import CloudManager
21
from ._command_builder import command_builder
22
from ._format import Format
23
from ._hls_helper import HLSKeyInfoFile, HLSMasterPlaylist
24
from ._process import Process
25
from ._reperesentation import Representation, AutoRep
26
from ._utiles import mkdir, rm
27
from .ffprobe import FFProbe
28
29
30
class Save(abc.ABC):
31
    def __init__(self, media, _format: Format, **options):
32
        """
33
        @TODO: add documentation
34
        """
35
        atexit.register(self.finish_up)
36
37
        self.output_ = ''
38
        self.key = None
39
        self.media = media
40
        self.format = _format
41
        self.options = options
42
        self.pipe = None
43
        self.probe = None
44
        self.output_temp = False
45
46
    def finish_up(self):
47
        """
48
        @TODO: add documentation
49
        """
50
        if self.media.input_temp:
51
            rm(self.media.input)
52
        if self.output_temp:
53
            shutil.rmtree(os.path.dirname(str(self.output_)), ignore_errors=True)
54
55
    @abc.abstractmethod
56
    def set_up(self):
57
        pass
58
59
    def __getattr__(self, name):
60
        def method(*args, **kwargs):
61
            if name in ['save', 'package']:
62
                self.output(*args, **kwargs)
63
            else:
64
                raise AttributeError("The object has no attribute {}".format(name))
65
66
        return method
67
68
    def output(self, output: str = None, clouds: CloudManager = None, monitor: callable = None, ffmpeg_bin: str = 'ffmpeg', **options):
69
        """
70
        @TODO: add documentation
71
        """
72
        if output is None and clouds is None:
73
            self.output_ = self.media.input
74
        elif clouds is not None:
75
            self.output_temp = True
76
            if clouds.filename is None:
77
                clouds.filename = os.path.basename(output if output is not None else self.media.input)
78
            self.output_ = os.path.join(tempfile.mkdtemp(prefix='ffmpeg_streaming_'), clouds.filename)
79
        else:
80
            mkdir(os.path.dirname(output))
81
            self.output_ = output
82
83
        self.set_up()
84
        asyncio.run(self._run(ffmpeg_bin, monitor, **options))
85
86
        if clouds is not None:
87
            clouds.transfer('upload_directory', os.path.dirname(self.output_))
88
89
        if output is not None and self.output_temp:
90
            shutil.move(os.path.dirname(self.output_), os.path.dirname(output))
91
92
    async def _run(self, ffmpeg_bin, monitor: callable = None, **options):
93
        """
94
        @TODO: add documentation
95
        """
96
        with Process(self, command_builder(ffmpeg_bin, self), monitor, **options) as process:
97
            self.pipe, err = process.run()
98
99
100
class Streaming(Save, abc.ABC):
101
    def __init__(self, media, _format: Format, **options):
102
        """
103
        @TODO: add documentation
104
        """
105
        self.reps = list
106
        super(Streaming, self).__init__(media, _format, **options)
107
108
    def representations(self, *reps: Representation):
109
        self.reps = list(reps)
110
111
    def auto_generate_representations(self, heights=None, bitrate=None, ffprobe_bin='ffprobe', include_original=True):
112
        """
113
        @TODO: add documentation
114
        """
115
        self.probe = FFProbe(self.media.input, ffprobe_bin)
116
        self.reps = AutoRep(self.probe.video_size, self.probe.bitrate, self.format, heights, bitrate, include_original)
117
118
    def add_filter(self, *_filter: str):
119
        """
120
        @TODO: add documentation
121
        """
122
        _filters = self.options.pop('filter_complex', None)
123
        _filters = _filters + "," + ",".join(list(_filter)) if _filters is not None else ",".join(list(_filter))
124
        self.options.update({'filter_complex': _filters})
125
126
    def watermarking(self, path, _filter='overlay=10:10'):
127
        """
128
        @TODO: add documentation
129
        """
130
        self.media.inputs.input(path)
131
        self.add_filter(_filter)
132
133
134
class DASH(Streaming):
135
    def set_up(self):
136
        pass
137
138
    def generate_hls_playlist(self):
139
        self.options.update({'hls_playlist': 1})
140
141
142
class HLS(Streaming):
143
    KEY_INFO_FILE_PATH = None
144
    PERIODIC_RE_KEY_FLAG = 'periodic_rekey'
145
    MASTER_PLAYLIST_IS_SAVED = False
146
147
    def set_up(self):
148
        """
149
        @TODO: add documentation
150
        """
151
        if not HLS.MASTER_PLAYLIST_IS_SAVED:
152
            self.save_master_playlist()
153
154
    def encryption(self, path: str, url: str, key_rotation_period: int = 0, needle: str = ".ts' for writing", length: int = 16):
155
        """
156
        @TODO: add documentation
157
        """
158
        with tempfile.NamedTemporaryFile(mode='w', suffix='_py_ff_vi_st.tmp', delete=False) as key_info:
159
            HLS.KEY_INFO_FILE_PATH = key_info.name
160
161
        key_info_file = HLSKeyInfoFile(HLS.KEY_INFO_FILE_PATH, path, url, key_rotation_period, needle, length)
162
        self.options.update({'hls_key_info_file': str(key_info_file)})
163
164
        if key_rotation_period > 0:
165
            setattr(self, 'key_rotation', key_info_file)
166
            self.flags(HLS.PERIODIC_RE_KEY_FLAG)
167
168
    def fragmented_mp4(self):
169
        """
170
        @TODO: add documentation
171
        """
172
        self.options.update({'hls_segment_type': 'fmp4'})
173
174
    def save_master_playlist(self, path=None):
175
        """
176
        @TODO: add documentation
177
        """
178
        if path is not None:
179
            HLS.MASTER_PLAYLIST_IS_SAVED = True
180
181
        HLSMasterPlaylist.generate(self, path)
182
183
    def flags(self, *flags: str):
184
        """
185
        @TODO: add documentation
186
        """
187
        hls_flags = self.options.pop('hls_flags', None)
188
        hls_flags = hls_flags + "+" + "+".join(list(flags)) if hls_flags is not None else "+".join(list(flags))
189
        self.options.update({'hls_flags': hls_flags})
190
191
    def finish_up(self):
192
        """
193
        @TODO: add documentation
194
        """
195
        if HLS.KEY_INFO_FILE_PATH is not None:
196
            rm(HLS.KEY_INFO_FILE_PATH)
197
198
        super(HLS, self).finish_up()
199
200
201
class Stream2File(Save):
202
    """
203
    @TODO: add documentation
204
    """
205
    def set_up(self):
206
        pass
207
208
209
class Media(object):
210
    def __init__(self, _inputs):
211
        """
212
        @TODO: add documentation
213
        """
214
        self.inputs = _inputs
215
216
        first_options = dict(_inputs.inputs[0])
217
        self.input = first_options.get('i', None)
218
        self.input_temp = first_options.get('is_tmp', False)
219
220
    def hls(self, _format: Format, **hls_options):
221
        """
222
        @TODO: add documentation
223
        """
224
        return HLS(self, _format, **hls_options)
225
226
    def dash(self, _format: Format, **dash_options):
227
        """
228
        @TODO: add documentation
229
        """
230
        return DASH(self, _format, **dash_options)
231
232
    def stream2file(self, _format: Format, **options):
233
        """
234
        @TODO: add documentation
235
        """
236
        return Stream2File(self, _format, **options)
237
238