Total Complexity | 3 |
Total Lines | 41 |
Duplicated Lines | 0 % |
Changes | 0 |
1 | import json |
||
2 | from typing import Any, Dict, Protocol |
||
3 | |||
4 | from attr import define |
||
5 | |||
6 | |||
7 | class CLIResult(Protocol): |
||
8 | exit_code: int |
||
9 | stdout: str |
||
10 | stderr: str |
||
11 | |||
12 | |||
13 | class FFProbeCallable(Protocol): |
||
14 | def __call__(self, *args: str) -> CLIResult: |
||
15 | ... |
||
16 | |||
17 | |||
18 | @define |
||
19 | class FFProbeClient: |
||
20 | ffprobe: FFProbeCallable |
||
21 | |||
22 | def get_stream_info(self, file_path: str) -> Dict[str, Any]: |
||
23 | cli_result = self.ffprobe( |
||
24 | '-v', |
||
25 | 'error', |
||
26 | '-show_entries', |
||
27 | 'stream_tags=rotate:format=size,duration:stream=codec_name,bit_rate,sample_rate,channels,nb_frames', |
||
28 | '-of', |
||
29 | 'default=noprint_wrappers=1', |
||
30 | '-show_format', |
||
31 | '-print_format', |
||
32 | 'json', |
||
33 | str(file_path), |
||
34 | ) |
||
35 | if cli_result.exit_code != 0: |
||
36 | raise RuntimeError(f"ffprobe failed with exit code {cli_result.exit_code}") |
||
37 | assert cli_result.exit_code == 0 |
||
38 | assert cli_result.stderr == '' |
||
39 | res = cli_result.stdout |
||
40 | return json.loads(res) |
||
41 |