GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

PcapLoop.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 7
Bugs 1 Features 0
Metric Value
cc 1
c 7
b 1
f 0
dl 0
loc 5
rs 9.4285
1
import time
2
from queue import Queue
3
from struct import unpack, error
4
from threading import Thread, Event
5
6
from libAnt.drivers.driver import Driver
7
from libAnt.loggers.logger import Logger
8
9
10
class PcapDriver(Driver):
11
    def __init__(self, pcap : str, logger: Logger = None):
12
        super().__init__(logger=logger)
13
        self._isopen = False
14
        self._pcap = pcap
15
        self._buffer = Queue()
16
17
        self._loop = None
18
19
    class PcapLoop(Thread):
20
        def __init__(self, pcap, buffer: Queue):
21
            super().__init__()
22
            self._stopper = Event()
23
            self._pcap = pcap
24
            self._buffer = buffer
25
26
        def stop(self) -> None:
27
            self._stopper.set()
28
29
        def run(self) -> None:
30
            self._pcapfile = open(self._pcap, 'rb')
31
            # move file pointer to first packet header
32
            global_header_length = 24
33
            self._pcapfile.seek(global_header_length, 0)
34
35
            first_ts = 0
36
            start_time = time.time()
37
            while not self._stopper.is_set():
38
                try:
39
                    ts_sec, = unpack('i', self._pcapfile.read(4))
40
                except error:
41
                    break
42
                ts_usec = unpack('i', self._pcapfile.read(4))[0] / 1000000
43
44
                if first_ts is 0:
45
                    first_ts = ts_sec + ts_usec
46
47
                ts = ts_sec + ts_usec
48
                send_time = ts - first_ts
49
                elapsed_time = time.time() - start_time
50
                if send_time > (elapsed_time):
51
                    sleep_time = send_time - elapsed_time
52
                    time.sleep(sleep_time)
53
54
                packet_length = unpack('i', self._pcapfile.read(4))[0]
55
                self._pcapfile.seek(4, 1)
56
                for i in range(packet_length):
57
                    self._buffer.put(self._pcapfile.read(1))
58
59
            self._pcapfile.close()
60
61
    def _isOpen(self) -> bool:
62
        return self._isopen
63
64
    def _open(self) -> None:
65
        self._isopen = True
66
        self._loop = self.PcapLoop(self._pcap, self._buffer)
67
        self._loop.start()
68
69
    def _close(self) -> None:
70
        self._isopen = False
71
        if self._loop is not None:
72
            if self._loop.is_alive():
73
                self._loop.stop()
74
                self._loop.join()
75
        self._loop = None
76
77
    def _read(self, count: int, timeout=None) -> bytes:
78
        result = bytearray()
79
80
        while len(result) < count:
81
            result += self._buffer.get(block=True, timeout=timeout)
82
83
        return bytes(result)
84
85
    def _write(self, data: bytes) -> None:
86
        pass
87