AudioSegmenter._segment()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 50
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 6.3324

Importance

Changes 0
Metric Value
cc 4
eloc 20
nop 3
dl 0
loc 50
ccs 9
cts 19
cp 0.4737
crap 6.3324
rs 9.4
c 0
b 0
f 0
1 1
import logging
2 1
import os
3 1
import tempfile
4 1
import time
5 1
from typing import List, Optional
6
7 1
from music_album_creation.ffmpeg import FFMPEG
8
9 1
from .data import SegmentationInformation
10
11 1
logger = logging.getLogger(__name__)
12
13
14 1
ffmpeg = FFMPEG(os.environ.get('MUSIC_FFMPEG', 'ffmpeg'))
15
16 1
EXT = 'mp3'
17 1
18
19 1
class AudioSegmenter(object):
20
    def __init__(self, target_directory=tempfile.gettempdir()):
21
        self._dir = target_directory
22 1
23
    @property
24 1
    def target_directory(self):
25
        """The directory path that will serve as the destination for storing created tracks"""
26 1
        return self._dir
27
28 1
    @target_directory.setter
29 1
    def target_directory(self, directory_path):
30
        self._dir = directory_path
31 1
32
    def _trans(self, track_info):
33
        """Create (file) name of output track."""
34
        return [os.path.join(self._dir, f'{track_info[0]}.{EXT}')] + track_info[1:]
35
36
    def segment(self, album_file, data, sleep_seconds=0):
37
        """
38
        :param album_file:
39
        :param data:
40
        :param supress_stdout:
41 1
        :param supress_stderr:
42 1
        :param float sleep_seconds:
43 1
        :return:
44 1
        """
45 1
        exit_code = 0
46 1
        i = 0
47 1
        track_files: List[str] = []
48
        while exit_code == 0 and i < len(data) - 1:
49 1
            time.sleep(sleep_seconds)
50 1
            result = self._segment(
51
                album_file,
52 1
                *self._trans(list(data[i])),
53
            )
54 1
            if result.exit_code != 0:
55
                logger.error("Fmmpeg exit code: %s", result.exit_code)
56
                logger.error("Ffmpeg st out: %s", result.stdout)
57
                logger.error("Fmmpeg stderr: %s", result.stderr)
58
                raise FfmpegCommandError("Command '{}' failed".format(' '.join(self._args)))
59
            track_files.append(list(data[i])[1])
60
            i += 1
61
        result = self._segment(
62
            album_file,
63
            *self._trans(list(data[-1])),
64
        )
65
        if result.exit_code != 0:
66
            logger.error("Fmmpeg exit code: %s", result.exit_code)
67
            logger.error("Ffmpeg st out: %s", result.stdout)
68
            logger.error("Fmmpeg stderr: %s", result.stderr)
69
            raise FfmpegCommandError("Command '{}' failed".format(' '.join(self._args)))
70
        # TODO: remove the need to hard-code extension. should know from download module
71
        # if we download youtube video as mp4 audio, then each track should probably be mp4 too
72
        # if we download youtube video as mp3 audio, then each track should probably be mp3 too
73
        return [os.path.join(self._dir, f'{list(x)[0]}.{EXT}') for x in data]
74
75
    def segment_from_file(
76
        self,
77
        album_file,
78
        tracks_file,
79
        hhmmss_type,
80
        supress_stdout=True,
81
        supress_stderr=True,
82
        sleep_seconds=0,
83 1
    ):
84
        """
85
        Assumes the hhmmss are starting timestamps. Given an album audio file and a file with track information, segments the audio file into audio tracks which get stored in the 'self.target_directory' folder.\n
86
        :param str album_file:
87
        :param str tracks_file:
88
        :param str hhmmss_type:
89
        :param bool supress_stdout:
90
        :param bool supress_stderr:
91
        :param float sleep_seconds:
92
        :return:
93
        """
94 1
        with open(tracks_file, 'r') as f:
95 1
            segmentation_info = SegmentationInformation.from_multiline(
96 1
                f.read().strip(), hhmmss_type
97
            )
98 1
        return self.segment(
99 1
            album_file,
100 1
            segmentation_info,
101 1
            supress_stdout=supress_stdout,
102 1
            supress_stderr=supress_stderr,
103
            sleep_seconds=sleep_seconds,
104 1
        )
105 1
106 1
    def _segment(self, *args, **kwargs):
107 1
        album_file = args[0]
108 1
        track_file = args[1]
109 1
        start = args[2]  # starting timestamp
110 1
        end: Optional[str] = None  # end timestamp
111 1
112
        # if it is the last segment then end timestamp is the end of the album
113
        # and in that case the client code need to supply 3 arguments (not 4)
114
        if 3 < len(args):
115 1
            end = args[3]
116
117
        # args = ['ffmpeg', '-y', '-i', '-acodec', 'copy', '-ss']
118
        # self._args = args[:3] + ['{}'.format(album_file)] + args[3:] + [start] + (lambda: ['-to', str(end)] if end else [])() + ['{}'.format(track_file)]
119 1
        # COPY web stream as it is (no custom encododing)
120
        # so cannot change the file extension to store
121
        # output extension is better kept to be guessed by ffmpeg from the input file
122
        self._args = (
123
            '-y',
124
            '-i',
125 1
            str(album_file),  # youtube downloaded audio steam (ie local mp4 file)
126
            # '-acodec',
127
            # 'copy',
128
            # 'AAC',
129
            # '-c:a',
130
            # 'libmp3lame',
131
            # '-qscale:a',
132
            # '9',  # max quality
133
            # '-ab',
134
            # '133k',
135
            '-ss',
136
            start,
137
            *list((lambda: ['-to', str(end)] if end else [])()),
138
            '-f',
139
            'mp3',
140
            str(track_file),
141
        )
142
        # self._args = (
143
        #     '-y',
144
        #     '-i',
145
        #     str(album_file),  # youtube downloaded audio steam (ie local mp4 file)
146
        #     '-acodec',
147
        #     'copy',
148
        #     '-ss',
149
        #     start,
150
        #     *list((lambda: ['-to', str(end)] if end else [])()),
151
        #     str(track_file).replace('mp4', 'mp3'),
152
        # )
153
        logger.info("Segmenting: ffmpeg '{}'".format(' '.join(self._args)))
154
        return ffmpeg(
155
            *self._args,
156
        )
157
158
159
class FfmpegCommandError(Exception):
160
    pass
161
162
163
if __name__ == '__main__':
164
    import sys
165
166
    if len(sys.argv) < 3:
167
        print('Usage: python3 Album_segmentation.py AUDIO_FILE TRACKS_FILE')
168
        sys.exit(1)
169
    try:
170
        audio_segmenter = AudioSegmenter()
171
        audio_segmenter.segment_from_file(
172
            sys.argv[1],
173
            sys.argv[2],
174
            supress_stdout=True,
175
            supress_stderr=True,
176
            verbose=True,
177
            sleep_seconds=0.45,
178
        )
179
    except Exception as e:
180
        print(e)
181
        sys.exit(1)
182