GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#5)
by
unknown
01:16
created

PcapLoop   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 46
Duplicated Lines 0 %

Importance

Changes 13
Bugs 1 Features 0
Metric Value
c 13
b 1
f 0
dl 0
loc 46
rs 10
wmc 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A stop() 0 2 1
D run() 0 35 8
A __init__() 0 6 1
1
import time
2
from queue import Queue
3
from struct import unpack, error
4
from threading import Thread, Event
5
6
from libAnt.drivers.driver import Driver
7
from libAnt.loggers.logger import Logger
8
9
10
class PcapDriver(Driver):
11
    def __init__(self, pcap : str, logger: Logger = None, loopplayback: bool = False):
12
        super().__init__(logger=logger)
13
        self._isopen = False
14
        self._pcap = pcap
15
        self._loopplayback = loopplayback
16
        self._buffer = Queue()
17
18
        self._loop = None
19
20
    class PcapLoop(Thread):
21
        def __init__(self, pcap, buffer: Queue, loopplayback):
22
            super().__init__()
23
            self._stopper = Event()
24
            self._pcap = pcap
25
            self._loopplayback = loopplayback
26
            self._buffer = buffer
27
28
        def stop(self) -> None:
29
            self._stopper.set()
30
31
        def run(self) -> None:
32
            self._pcapfile = open(self._pcap, 'rb')
33
34
            while not self._stopper.is_set():
35
                # move file pointer to first packet header
36
                global_header_length = 24
37
                self._pcapfile.seek(global_header_length, 0)
38
39
                first_ts = 0
40
                start_time = time.time()
41
                while True:
42
                    try:
43
                        ts_sec, = unpack('i', self._pcapfile.read(4))
44
                    except error:
45
                        break
46
                    ts_usec = unpack('i', self._pcapfile.read(4))[0] / 1000000
47
48
                    if first_ts is 0:
49
                        first_ts = ts_sec + ts_usec
50
51
                    ts = ts_sec + ts_usec
52
                    send_time = ts - first_ts
53
                    elapsed_time = time.time() - start_time
54
                    if send_time > elapsed_time:
55
                        sleep_time = send_time - elapsed_time
56
                        time.sleep(sleep_time)
57
58
                    packet_length = unpack('i', self._pcapfile.read(4))[0]
59
                    self._pcapfile.seek(4, 1)
60
                    for i in range(packet_length):
61
                        self._buffer.put(self._pcapfile.read(1))
62
                if not self._loopplayback:
63
                    break
64
65
            self._pcapfile.close()
66
67
    def _isOpen(self) -> bool:
68
        return self._isopen
69
70
    def _open(self) -> None:
71
        self._isopen = True
72
        self._loop = self.PcapLoop(self._pcap, self._buffer, self._loopplayback)
73
        self._loop.start()
74
75
    def _close(self) -> None:
76
        self._isopen = False
77
        if self._loop is not None:
78
            if self._loop.is_alive():
79
                self._loop.stop()
80
                self._loop.join()
81
        self._loop = None
82
83
    def _read(self, count: int, timeout=None) -> bytes:
84
        result = bytearray()
85
86
        while len(result) < count:
87
            result += self._buffer.get(block=True, timeout=timeout)
88
89
        return bytes(result)
90
91
    def _write(self, data: bytes) -> None:
92
        pass
93