Completed
Push — master ( b8f070...484cc4 )
by Jeffrey
03:38
created

IGate   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 133
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 133
rs 10
c 1
b 0
f 0
wmc 22

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 8 1
A close() 0 5 2
B connect() 0 27 2
B write() 0 31 4
F read() 0 42 12
A __reset_buffer() 0 3 1
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""APRS Internet Service Class Definitions"""
5
6
# These imports are for python3 compatability inside python2
7
from __future__ import absolute_import
8
from __future__ import division
9
from __future__ import print_function
10
11
import logging
12
import select
13
import socket
14
import threading
15
import time
16
import cachetools
17
import requests
18
19
from apex.aprs import constants as aprs_constants
20
from apex.aprs import util as aprs_util
21
22
__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
23
__maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)'
24
__email__ = '[email protected]'
25
__license__ = 'Apache License, Version 2.0'
26
__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
27
__credits__ = []
28
29
30
class ReconnectingPacketBuffer(object):
31
32
    STARTING_WAIT_TIME = 2
33
    MAX_WAIT_TIME = 300
34
    WAIT_TIME_MULTIPLIER = 2
35
    MAX_INDEX = 1000000
36
37
    def __init__(self, packet_layer):
38
        self.packet_layer = packet_layer
39
        self.to_packet_layer = cachetools.TTLCache(10, 30)
40
        self.current_index = 0
41
        self.from_packet_layer = cachetools.TTLCache(10, 30)
42
        self.connect_thread = None
43
        self.lock = threading.Lock()
44
        self.running = False
45
        self.reconnect_wait_time = self.STARTING_WAIT_TIME
46
        self.last_connect_attempt = None
47
        self.connect_args = None
48
        self.connect_kwargs = None
49
        self.connected = False
50
51
    def __next_index(self):
52
        if self.current_index > self.MAX_INDEX:
53
            self.current_index = 0
54
        return ++self.current_index
55
56
    def __increment_wait_time(self):
57
        self.reconnect_wait_time *= self.WAIT_TIME_MULTIPLIER
58
        if self.reconnect_wait_time > self.MAX_WAIT_TIME:
59
            self.reconnect_wait_time = self.MAX_WAIT_TIME
60
61
    def __reset_wait_time(self):
62
        self.reconnect_wait_time = self.STARTING_WAIT_TIME
63
64
    def __run(self):
65
        while self.running:
66
            if not self.connected:
67
                if not self.last_connect_attempt or time.time() - self.last_connect_attempt > self.reconnect_wait_time:
68
                    try:
69
                        self.last_connect_attempt = time.time()
70
                        self.packet_layer.connect(*self.connect_args, **self.connect_kwargs)
71
                        self.connected = True
72
                    except IOError:
73
                        try:
74
                            self.packet_layer.close()
75
                        except IOError:
76
                            pass
77
                        self.__increment_wait_time()
78
                else:
79
                    time.sleep(1)
80
            else:
81
                io_occured = False
82
83
                # lets attempt to read in a packet
84
                try:
85
                    read_packet = self.packet_layer.read()
86
                    self.__reset_wait_time()
87
                    if read_packet:
88
                        with self.lock:
89
                                self.from_packet_layer[self.__next_index()] = read_packet
90
                        io_occured = True
91
                except IOError:
92
                    try:
93
                        self.packet_layer.close()
94
                    except IOError:
95
                        pass
96
                    self.connected = False
97
                    continue
98
99
                # lets try to write a packet, if any are waiting.
100
                write_packet = None
101
                with self.lock:
102
                    if self.to_packet_layer:
103
                        write_packet = self.to_packet_layer.popitem()[1]
104
                if write_packet:
105
                    try:
106
                        self.packet_layer.write(write_packet)
107
                        io_occured = True
108
                        self.__reset_wait_time()
109
                    except IOError:
110
                        self.to_packet_layer[self.__next_index()] = write_packet
111
                        try:
112
                            self.packet_layer.close()
113
                        except IOError:
114
                            pass
115
                        self.connected = False
116
                        continue
117
118
                if not io_occured:
119
                    time.sleep(1)
120
        try:
121
            self.packet_layer.close()
122
        except IOError:
123
            pass
124
125
    def connect(self, *args, **kwargs):
126
        with self.lock:
127
            if self.connect_thread:
128
                raise RuntimeError('already connected')
129
130
            self.running = True
131
            self.connect_args = args
132
            self.connect_kwargs = kwargs
133
            self.connect_thread = threading.Thread(target=self.__run)
134
            self.connect_thread.start()
135
136
    def close(self):
137
        with self.lock:
138
            if not self.connect_thread:
139
                raise RuntimeError('not connected')
140
141
            self.running = False
142
            self.connect_thread.join()
143
            self.connect_thread = None
144
145
    def read(self):
146
        with self.lock:
147
            if self.from_packet_layer:
148
                return self.from_packet_layer.popitem()[1]
149
        return None
150
151
    def write(self, packet):
152
        with self.lock:
153
            self.to_packet_layer[self.__next_index()] = packet
154
155
156
class IGate(object):
157
158
    """APRS Object."""
159
160
    logger = logging.getLogger(__name__)
161
    logger.setLevel(aprs_constants.LOG_LEVEL)
162
    console_handler = logging.StreamHandler()
163
    console_handler.setLevel(aprs_constants.LOG_LEVEL)
164
    console_handler.setFormatter(aprs_constants.LOG_FORMAT)
165
    logger.addHandler(console_handler)
166
    logger.propagate = False
167
168
    def __init__(self, user, password='-1', input_url=None):
169
        self.user = user
170
        self._url = input_url or aprs_constants.APRSIS_URL
171
        self._auth = ' '.join(
172
            ['user', user, 'pass', password, 'vers', 'APRS Python Module'])
173
        self.aprsis_sock = None
174
        self.data_buffer = ''
175
        self.packet_buffer = []
176
177
    def __reset_buffer(self):
178
        self.data_buffer = ''
179
        self.packet_buffer = []
180
181
    def connect(self, server=None, port=None, aprs_filter=None):
182
        """
183
        Connects & logs in to APRS-IS.
184
185
        :param server: Optional alternative APRS-IS server.
186
        :param port: Optional APRS-IS port.
187
        :param filter: Optional filter to use.
188
        :type server: str
189
        :type port: int
190
        :type filte: str
191
        """
192
        if not self.aprsis_sock:
193
            self.__reset_buffer()
194
195
            server = server or aprs_constants.APRSIS_SERVER
196
            port = port or aprs_constants.APRSIS_FILTER_PORT
197
            aprs_filter = aprs_filter or '/'.join(['p', self.user])
198
199
            self.full_auth = ' '.join([self._auth, 'filter', aprs_filter])
200
201
            self.server = server
202
            self.port = port
203
            self.aprsis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
204
            self.aprsis_sock.connect((server, port))
205
            self.logger.info('Connected to server=%s port=%s', server, port)
206
            self.logger.debug('Sending full_auth=%s', self.full_auth)
207
            self.aprsis_sock.sendall((self.full_auth + '\n\r').encode('ascii'))
208
209
    def close(self):
210
        if self.aprsis_sock:
211
            self.aprsis_sock.close()
212
            self.__reset_buffer()
213
            self.aprsis_sock = None
214
215
    def write(self, frame_decoded, headers=None, protocol='TCP'):
216
        """
217
        Sends message to APRS-IS.
218
219
        :param message: Message to send to APRS-IS.
220
        :param headers: Optional HTTP headers to post.
221
        :param protocol: Protocol to use: One of TCP, HTTP or UDP.
222
        :type message: str
223
        :type headers: dict
224
225
        :return: True on success, False otherwise.
226
        :rtype: bool
227
        """
228
229
        frame = aprs_util.encode_frame(frame_decoded)
230
        if 'TCP' in protocol:
231
            self.aprsis_sock.sendall(frame)
232
            return True
233
        elif 'HTTP' in protocol:
234
            content = '\n'.join([self._auth, frame])
235
            headers = headers or aprs_constants.APRSIS_HTTP_HEADERS
236
            result = requests.post(self._url, data=content, headers=headers)
237
            return 204 == result.status_code
238
        elif 'UDP' in protocol:
239
            content = '\n'.join([self._auth, frame])
240
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
241
            sock.sendto(
242
                content,
243
                (aprs_constants.APRSIS_SERVER, aprs_constants.APRSIS_RX_PORT)
244
            )
245
            return True
246
247
    def read(self, filter_logresp=True):
248
        """
249
        Receives from APRS-IS.
250
251
        :param callback: Optional callback to deliver data to.
252
        :type callback: func
253
        """
254
        # check if there is any data waiting
255
        read_more = True
256
        while read_more:
257
            selected = select.select([self.aprsis_sock], [], [], 0)
258
            if len(selected[0]) > 0:
259
                recvd_data = self.aprsis_sock.recv(aprs_constants.RECV_BUFFER)
260
                if not recvd_data:
261
                    self.data_buffer += recvd_data
262
                else:
263
                    read_more = False
264
            else:
265
                read_more = False
266
267
        # check for any complete packets and move them to the packet buffer
268
        if '\r\n' in self.data_buffer:
269
            partial = True
270
            if self.data_buffer.endswith('\r\n'):
271
                partial = False
272
            packets = recvd_data.split('\r\n')
273
            if partial:
274
                self.data_buffer = str(packets.pop(-1))
275
            else:
276
                self.data_buffer = ''
277
            for packet in packets:
278
                self.packet_buffer += [str(packet)]
279
280
        # return the next packet that matches the filter
281
        while len(self.packet_buffer):
282
            packet = self.packet_buffer.pop(0)
283
            if filter_logresp and packet.startswith('#') and 'logresp' in packet:
284
                pass
285
            else:
286
                return aprs_util.decode_frame(packet)
287
288
        return None
289