music_album_creation.ffprobe_client   A
last analyzed

Complexity

Total Complexity 3

Size/Duplication

Total Lines 41
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 41
rs 10
c 0
b 0
f 0
wmc 3

2 Methods

Rating   Name   Duplication   Size   Complexity  
A FFProbeCallable.__call__() 0 2 1
A FFProbeClient.get_stream_info() 0 19 2
1
import json
2
from typing import Any, Dict, Protocol
3
4
from attr import define
5
6
7
class CLIResult(Protocol):
8
    exit_code: int
9
    stdout: str
10
    stderr: str
11
12
13
class FFProbeCallable(Protocol):
14
    def __call__(self, *args: str) -> CLIResult:
15
        ...
16
17
18
@define
19
class FFProbeClient:
20
    ffprobe: FFProbeCallable
21
22
    def get_stream_info(self, file_path: str) -> Dict[str, Any]:
23
        cli_result = self.ffprobe(
24
            '-v',
25
            'error',
26
            '-show_entries',
27
            'stream_tags=rotate:format=size,duration:stream=codec_name,bit_rate,sample_rate,channels,nb_frames',
28
            '-of',
29
            'default=noprint_wrappers=1',
30
            '-show_format',
31
            '-print_format',
32
            'json',
33
            str(file_path),
34
        )
35
        if cli_result.exit_code != 0:
36
            raise RuntimeError(f"ffprobe failed with exit code {cli_result.exit_code}")
37
        assert cli_result.exit_code == 0
38
        assert cli_result.stderr == ''
39
        res = cli_result.stdout
40
        return json.loads(res)
41