Passed
Push — master ( 7bc1c2...5b857f )
by Amin
03:31
created

ffmpeg_streaming._process.Process._thread_mon()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 10
nop 1
dl 0
loc 11
rs 9.9
c 0
b 0
f 0
1
"""
2
ffmpeg_streaming.process
3
~~~~~~~~~~~~
4
5
Run FFmpeg commands and monitor FFmpeg
6
7
8
:copyright: (c) 2020 by Amin Yazdanpanah.
9
:website: https://www.aminyazdanpanah.com
10
:email: [email protected]
11
:license: MIT, see LICENSE for more details.
12
"""
13
14
import shlex
15
import subprocess
16
import threading
17
import logging
18
19
from ffmpeg_streaming._hls_helper import HLSKeyInfoFile
20
from ffmpeg_streaming._utiles import get_time
21
22
23
def _p_open(commands, **options):
24
    logging.info("ffmpeg running command: {}".format(commands))
25
    return subprocess.Popen(shlex.split(commands), **options)
26
27
28
class Process(object):
29
    out = None
30
    err = None
31
32
    def __init__(self, media, commands: str, monitor: callable = None, **options):
33
        self.is_monitor = False
34
        self.input = options.pop('input', None)
35
        self.timeout = options.pop('timeout', None)
36
        default_proc_opts = {
37
            'stdin': None,
38
            'stdout': subprocess.PIPE,
39
            'stderr': subprocess.STDOUT,
40
            'universal_newlines': False
41
        }
42
        default_proc_opts.update(options)
43
        options.update(default_proc_opts)
44
        if callable(monitor) or isinstance(getattr(media, 'key_rotation'), HLSKeyInfoFile):
45
            self.is_monitor = True
46
            options.update({
47
                'stdin': subprocess.PIPE,
48
                'universal_newlines': True
49
            })
50
51
        self.process = _p_open(commands, **options)
52
        self.media = media
53
        self.monitor = monitor
54
55
    def __enter__(self):
56
        return self
57
58
    def __exit__(self, exc_type, exc_val, exc_tb):
59
        self.process.kill()
60
61
    def _monitor(self):
62
        duration = 1
63
        time = 0
64
        log = []
65
66
        while True:
67
            line = self.process.stdout.readline().strip()
68
            if line == '' and self.process.poll() is not None:
69
                break
70
71
            if line != '':
72
                log += [line]
73
74
            if isinstance(getattr(self.media, 'key_rotation'), HLSKeyInfoFile):
75
                getattr(self.media, 'key_rotation').rotate_key(line)
76
77
            if callable(self.monitor):
78
                if duration < 2 and get_time('Duration: ', line) is not None:
79
                    duration = get_time('Duration: ', line)
80
81
                if get_time('time=', line):
82
                    time = get_time('time=', line)
83
84
                self.monitor(line, duration, time)
85
86
        Process.out = log
87
88
    def _thread_mon(self):
89
        thread = threading.Thread(target=self._monitor)
90
        thread.start()
91
92
        thread.join(self.timeout)
93
        if thread.is_alive():
94
            self.process.terminate()
95
            thread.join()
96
            error = 'Timeout! exceeded the timeout of {} seconds.'.format(str(self.timeout))
97
            logging.error(error)
98
            raise RuntimeError(error)
99
100
    def run(self):
101
        if self.is_monitor:
102
            self._thread_mon()
103
        else:
104
            Process.out, Process.err = self.process.communicate(self.input, self.timeout)
105
106
        if self.process.poll():
107
            error = str(Process.err) if Process.err else str(Process.out)
108
            logging.error('ffmpeg failed to execute command: {}'.format(error))
109
            raise RuntimeError('ffmpeg failed to execute command: ', error)
110
111
        logging.info("ffmpeg executed command successfully")
112
113
        return Process.out, Process.err
114