Test Failed
Push — master ( 05aaee...10b5c2 )
by Nicolas
04:12 queued 14s
created

StatsStreamer._pre_update_hook()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
9
import threading
10
import time
11
12
from glances.logger import logger
13
14
15
class StatsStreamer:
16
    """
17
    Utility class to stream an iterable using a background / daemon Thread
18
19
    Use `StatsStreamer.stats` to access the latest streamed results
20
    """
21
22
    def __init__(self, iterable, initial_stream_value=None, sleep_duration=0.1):
23
        """
24
        iterable: an Iterable instance that needs to be streamed
25
        """
26
        self._iterable = iterable
27
        # Iterable results are stored here
28
        self._raw_result = initial_stream_value
29
        # Use a Thread to stream iterable (daemon=True to automatically kill thread when main process dies)
30
        self._thread = threading.Thread(target=self._stream_results, daemon=True)
31
        # Event needed to stop the thread manually
32
        self._stopper = threading.Event()
33
        # Lock to avoid the daemon thread updating stats when main thread reads the stats
34
        self.result_lock = threading.Lock()
35
        # Last result streamed time (initial val 0)
36
        self._last_update_time = 0
37
        # Time to sleep before next iteration
38
        self._sleep_duration = sleep_duration
39
40
        self._thread.start()
41
42
    def stop(self):
43
        """Stop the thread."""
44
        self._stopper.set()
45
46
    def stopped(self):
47
        """Return True is the thread is stopped."""
48
        return self._stopper.is_set()
49
50
    def _stream_results(self):
51
        """Grab the stats.
52
53
        Infinite loop, should be stopped by calling the stop() method
54
        """
55
        try:
56
            for res in self._iterable:
57
                self._pre_update_hook()
58
                self._raw_result = res
59
                self._post_update_hook()
60
61
                time.sleep(self._sleep_duration)
62
                if self.stopped():
63
                    break
64
65
        except Exception as e:
66
            logger.debug("docker plugin - Exception thrown during run ({})".format(e))
67
            self.stop()
68
69
    def _pre_update_hook(self):
70
        """Hook that runs before worker thread updates the raw_stats"""
71
        self.result_lock.acquire()
72
73
    def _post_update_hook(self):
74
        """Hook that runs after worker thread updates the raw_stats"""
75
        self._last_update_time = time.time()
76
        self.result_lock.release()
77
78
    @property
79
    def stats(self):
80
        """Raw Stats getter."""
81
        return self._raw_result
82
83
    @property
84
    def last_update_time(self):
85
        """Raw Stats getter."""
86
        return self._last_update_time
87