Test Failed
Push — master ( 183265...afa1da )
by Nicolas
03:15 queued 16s
created

ThreadedIterableStreamer.stop()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
8
import threading
9
import time
10
11
from glances.logger import logger
12
13
14
class ThreadedIterableStreamer:
15
    """
16
    Utility class to stream an iterable using a background / daemon Thread
17
18
    Use `ThreadedIterableStreamer.stats` to access the latest streamed results
19
    """
20
21
    def __init__(self, iterable, initial_stream_value=None, sleep_duration=0.1):
22
        """
23
        iterable: an Iterable instance that needs to be streamed
24
        """
25
        self._iterable = iterable
26
        # Iterable results are stored here
27
        self._raw_result = initial_stream_value
28
        # Use a Thread to stream iterable (daemon=True to automatically kill thread when main process dies)
29
        self._thread = threading.Thread(target=self._stream_results, daemon=True)
30
        # Event needed to stop the thread manually
31
        self._stopper = threading.Event()
32
        # Lock to avoid the daemon thread updating stats when main thread reads the stats
33
        self.result_lock = threading.Lock()
34
        # Last result streamed time (initial val 0)
35
        self._last_update_time = 0
36
        # Time to sleep before next iteration
37
        self._sleep_duration = sleep_duration
38
39
        self._thread.start()
40
41
    def stop(self):
42
        """Stop the thread."""
43
        self._stopper.set()
44
45
    def stopped(self):
46
        """Return True is the thread is stopped."""
47
        return self._stopper.is_set()
48
49
    def _stream_results(self):
50
        """Grab the stats.
51
52
        Infinite loop, should be stopped by calling the stop() method
53
        """
54
        try:
55
            for res in self._iterable:
56
                self._pre_update_hook()
57
                self._raw_result = res
58
                self._post_update_hook()
59
60
                time.sleep(self._sleep_duration)
61
                if self.stopped():
62
                    break
63
64
        except Exception as e:
65
            logger.debug(f"docker plugin - Exception thrown during run ({e})")
66
            self.stop()
67
68
    def _pre_update_hook(self):
69
        """Hook that runs before worker thread updates the raw_stats"""
70
        self.result_lock.acquire()
71
72
    def _post_update_hook(self):
73
        """Hook that runs after worker thread updates the raw_stats"""
74
        self._last_update_time = time.time()
75
        self.result_lock.release()
76
77
    @property
78
    def stats(self):
79
        """Raw Stats getter."""
80
        return self._raw_result
81
82
    @property
83
    def last_update_time(self):
84
        """Raw Stats getter."""
85
        return self._last_update_time
86