ReconnectingPacketBuffer   A
last analyzed

Complexity

Total Complexity 33

Size/Duplication

Total Lines 122
Duplicated Lines 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 122
rs 9.3999
wmc 33

8 Methods

Rating   Name   Duplication   Size   Complexity  
A read() 0 5 3
A connect() 0 10 3
A write() 0 3 2
A __increment_wait_time() 0 4 2
F __run() 0 63 18
A __reset_wait_time() 0 2 1
A close() 0 8 3
A __init__() 0 13 1
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
# These imports are for python3 compatibility inside python2
5
from __future__ import absolute_import
6
from __future__ import division
7
from __future__ import print_function
8
9
import threading
10
import time
11
12
import cachetools
13
14
from apex.aprs import util as aprs_util
15
from .util import echo_colorized_frame
16
from .util import echo_colorized_warning
17
18
19
class NonrepeatingBuffer(object):
20
    def __init__(self, base_tnc, base_name, base_port=None, echo_packets=True, buffer_size=10000, buffer_time=30):
21
        self.packet_cache = cachetools.TTLCache(buffer_size, buffer_time)
22
        self.lock = threading.Lock()
23
        self.base_tnc = base_tnc
24
        self.base_port = base_port
25
        self.base_name = base_name
26
        self.echo_packets = echo_packets
27
28
    @property
29
    def port(self):
30
        return self.base_port
31
32
    @property
33
    def name(self):
34
        return self.base_name
35
36
    def connect(self, *args, **kwargs):
37
        self.base_tnc.connect(*args, **kwargs)
38
39
    def close(self, *args, **kwargs):
40
        self.base_tnc.close(*args, **kwargs)
41
42
    def write(self, frame, *args, **kwargs):
43
        with self.lock:
44
            frame_hash = str(aprs_util.hash_frame(frame))
45
            if frame_hash not in self.packet_cache:
46
                self.packet_cache[frame_hash] = frame
47
                if self.base_port:
48
                    self.base_tnc.write(frame, self.base_port)
49
                else:
50
                    self.base_tnc.write(frame)
51
52
                if self.echo_packets:
53
                    echo_colorized_frame(frame, self.base_name, False)
54
55
    def read(self, *args, **kwargs):
56
        with self.lock:
57
            frame = self.base_tnc.read(*args, **kwargs)
58
            if not frame:
59
                return frame
60
            frame_hash = str(aprs_util.hash_frame(frame))
61
            if frame_hash not in self.packet_cache:
62
                self.packet_cache[frame_hash] = frame
63
                if self.echo_packets:
64
                    echo_colorized_frame(frame, self.base_name, True)
65
                return frame
66
            else:
67
                return None
68
69
70
class ReconnectingPacketBuffer(object):
71
72
    STARTING_WAIT_TIME = 2
73
    MAX_WAIT_TIME = 300
74
    WAIT_TIME_MULTIPLIER = 2
75
    MAX_INDEX = 1000000
76
77
    def __init__(self, packet_layer):
78
        self.packet_layer = packet_layer
79
        self.to_packet_layer = cachetools.TTLCache(10, 30)
80
        self.current_index = 0
81
        self.from_packet_layer = cachetools.TTLCache(10, 30)
82
        self.connect_thread = None
83
        self.lock = threading.Lock()
84
        self.running = False
85
        self.reconnect_wait_time = self.STARTING_WAIT_TIME
86
        self.last_connect_attempt = None
87
        self.connect_args = None
88
        self.connect_kwargs = None
89
        self.connected = False
90
91
    def __increment_wait_time(self):
92
        self.reconnect_wait_time *= self.WAIT_TIME_MULTIPLIER
93
        if self.reconnect_wait_time > self.MAX_WAIT_TIME:
94
            self.reconnect_wait_time = self.MAX_WAIT_TIME
95
96
    def __reset_wait_time(self):
97
        self.reconnect_wait_time = self.STARTING_WAIT_TIME
98
99
    def __run(self):
100
        while self.running:
101
            if not self.connected:
102
                if not self.last_connect_attempt or time.time() - self.last_connect_attempt > self.reconnect_wait_time:
103
                    try:
104
                        self.last_connect_attempt = time.time()
105
                        self.packet_layer.connect(*self.connect_args, **self.connect_kwargs)
106
                        self.connected = True
107
                    except IOError:
108
                        echo_colorized_warning('Could not connect, will reattempt.')
109
                        try:
110
                            self.packet_layer.close()
111
                        except IOError:
112
                            pass
113
                        self.__increment_wait_time()
114
                else:
115
                    time.sleep(1)
116
            else:
117
                io_occured = False
118
119
                # lets attempt to read in a packet
120
                try:
121
                    read_packet = self.packet_layer.read()
122
                    self.__reset_wait_time()
123
                    if read_packet:
124
                        with self.lock:
125
                                self.from_packet_layer[str(aprs_util.hash_frame(read_packet))] = read_packet
126
                        io_occured = True
127
                except IOError:
128
                    echo_colorized_warning('Read failed. Will disconnect and attempt to reconnect.')
129
                    try:
130
                        self.packet_layer.close()
131
                    except IOError:
132
                        pass
133
                    self.connected = False
134
                    continue
135
136
                # lets try to write a packet, if any are waiting.
137
                write_packet = None
138
                with self.lock:
139
                    if self.to_packet_layer:
140
                        write_packet = self.to_packet_layer.popitem()[1]
141
                if write_packet:
142
                    try:
143
                        self.packet_layer.write(write_packet)
144
                        io_occured = True
145
                        self.__reset_wait_time()
146
                    except IOError:
147
                        echo_colorized_warning('Write failed. Will disconnect and attempt to reconnect.')
148
                        self.to_packet_layer[str(aprs_util.hash_frame(read_packet))] = write_packet
149
                        try:
150
                            self.packet_layer.close()
151
                        except IOError:
152
                            pass
153
                        self.connected = False
154
                        continue
155
156
                if not io_occured:
157
                    time.sleep(1)
158
        try:
159
            self.packet_layer.close()
160
        except IOError:
161
            pass
162
163
    def connect(self, *args, **kwargs):
164
        with self.lock:
165
            if self.connect_thread:
166
                raise RuntimeError('already connected')
167
168
            self.running = True
169
            self.connect_args = args
170
            self.connect_kwargs = kwargs
171
            self.connect_thread = threading.Thread(target=self.__run)
172
            self.connect_thread.start()
173
174
    def close(self):
175
        with self.lock:
176
            if not self.connect_thread:
177
                raise RuntimeError('not connected')
178
179
            self.running = False
180
            self.connect_thread.join()
181
            self.connect_thread = None
182
183
    def read(self):
184
        with self.lock:
185
            if self.from_packet_layer:
186
                return self.from_packet_layer.popitem()[1]
187
        return None
188
189
    def write(self, packet):
190
        with self.lock:
191
            self.to_packet_layer[str(aprs_util.hash_frame(packet))] = packet
192