Completed
Push — master ( c8f98a...854f8e )
by Jeffrey
03:43
created

ReconnectingPacketBuffer.__next_index()   A

Complexity

Conditions 2

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
dl 0
loc 4
rs 10
c 1
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""APRS Internet Service Class Definitions"""
5
6
# These imports are for python3 compatability inside python2
7
from __future__ import absolute_import
8
from __future__ import division
9
from __future__ import print_function
10
11
import logging
12
import select
13
import socket
14
import threading
15
import time
16
import cachetools
17
import requests
18
19
from apex.aprs import constants as aprs_constants
20
from apex.aprs import util as aprs_util
21
22
__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
23
__maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)'
24
__email__ = '[email protected]'
25
__license__ = 'Apache License, Version 2.0'
26
__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
27
__credits__ = []
28
29
30
class ReconnectingPacketBuffer(object):
31
32
    STARTING_WAIT_TIME = 2
33
    MAX_WAIT_TIME = 300
34
    WAIT_TIME_MULTIPLIER = 2
35
    MAX_INDEX = 1000000
36
37
    def __init__(self, packet_layer):
38
        self.packet_layer = packet_layer
39
        self.to_packet_layer = cachetools.TTLCache(10, 30)
40
        self.current_index = 0
41
        self.from_packet_layer = cachetools.TTLCache(10, 30)
42
        self.connect_thread = None
43
        self.lock = threading.Lock()
44
        self.running = False
45
        self.reconnect_wait_time = self.STARTING_WAIT_TIME
46
        self.last_connect_attempt = None
47
        self.connect_args = None
48
        self.connect_kwargs = None
49
        self.connected = False
50
51
    def __increment_wait_time(self):
52
        self.reconnect_wait_time *= self.WAIT_TIME_MULTIPLIER
53
        if self.reconnect_wait_time > self.MAX_WAIT_TIME:
54
            self.reconnect_wait_time = self.MAX_WAIT_TIME
55
56
    def __reset_wait_time(self):
57
        self.reconnect_wait_time = self.STARTING_WAIT_TIME
58
59
    def __run(self):
60
        while self.running:
61
            if not self.connected:
62
                if not self.last_connect_attempt or time.time() - self.last_connect_attempt > self.reconnect_wait_time:
63
                    try:
64
                        self.last_connect_attempt = time.time()
65
                        self.packet_layer.connect(*self.connect_args, **self.connect_kwargs)
66
                        self.connected = True
67
                    except IOError:
68
                        try:
69
                            self.packet_layer.close()
70
                        except IOError:
71
                            pass
72
                        self.__increment_wait_time()
73
                else:
74
                    time.sleep(1)
75
            else:
76
                io_occured = False
77
78
                # lets attempt to read in a packet
79
                try:
80
                    read_packet = self.packet_layer.read()
81
                    self.__reset_wait_time()
82
                    if read_packet:
83
                        with self.lock:
84
                                self.from_packet_layer[str(aprs_util.hash_frame(read_packet))] = read_packet
85
                        io_occured = True
86
                except IOError:
87
                    try:
88
                        self.packet_layer.close()
89
                    except IOError:
90
                        pass
91
                    self.connected = False
92
                    continue
93
94
                # lets try to write a packet, if any are waiting.
95
                write_packet = None
96
                with self.lock:
97
                    if self.to_packet_layer:
98
                        write_packet = self.to_packet_layer.popitem()[1]
99
                if write_packet:
100
                    try:
101
                        self.packet_layer.write(write_packet)
102
                        io_occured = True
103
                        self.__reset_wait_time()
104
                    except IOError:
105
                        self.to_packet_layer[str(aprs_util.hash_frame(read_packet))] = write_packet
106
                        try:
107
                            self.packet_layer.close()
108
                        except IOError:
109
                            pass
110
                        self.connected = False
111
                        continue
112
113
                if not io_occured:
114
                    time.sleep(1)
115
        try:
116
            self.packet_layer.close()
117
        except IOError:
118
            pass
119
120
    def connect(self, *args, **kwargs):
121
        with self.lock:
122
            if self.connect_thread:
123
                raise RuntimeError('already connected')
124
125
            self.running = True
126
            self.connect_args = args
127
            self.connect_kwargs = kwargs
128
            self.connect_thread = threading.Thread(target=self.__run)
129
            self.connect_thread.start()
130
131
    def close(self):
132
        with self.lock:
133
            if not self.connect_thread:
134
                raise RuntimeError('not connected')
135
136
            self.running = False
137
            self.connect_thread.join()
138
            self.connect_thread = None
139
140
    def read(self):
141
        with self.lock:
142
            if self.from_packet_layer:
143
                return self.from_packet_layer.popitem()[1]
144
        return None
145
146
    def write(self, packet):
147
        with self.lock:
148
            self.to_packet_layer[str(aprs_util.hash_frame(read_packet))] = packet
149
150
151
class IGate(object):
152
153
    """APRS Object."""
154
155
    logger = logging.getLogger(__name__)
156
    logger.setLevel(aprs_constants.LOG_LEVEL)
157
    console_handler = logging.StreamHandler()
158
    console_handler.setLevel(aprs_constants.LOG_LEVEL)
159
    console_handler.setFormatter(aprs_constants.LOG_FORMAT)
160
    logger.addHandler(console_handler)
161
    logger.propagate = False
162
163
    def __init__(self, user, password='-1', input_url=None):
164
        self.user = user
165
        self._url = input_url or aprs_constants.APRSIS_URL
166
        self._auth = ' '.join(
167
            ['user', user, 'pass', password, 'vers', 'APRS Python Module'])
168
        self.aprsis_sock = None
169
        self.data_buffer = ''
170
        self.packet_buffer = []
171
172
    def __reset_buffer(self):
173
        self.data_buffer = ''
174
        self.packet_buffer = []
175
176
    def connect(self, server=None, port=None, aprs_filter=None):
177
        """
178
        Connects & logs in to APRS-IS.
179
180
        :param server: Optional alternative APRS-IS server.
181
        :param port: Optional APRS-IS port.
182
        :param filter: Optional filter to use.
183
        :type server: str
184
        :type port: int
185
        :type filte: str
186
        """
187
        if not self.aprsis_sock:
188
            self.__reset_buffer()
189
190
            server = server or aprs_constants.APRSIS_SERVER
191
            port = port or aprs_constants.APRSIS_FILTER_PORT
192
            aprs_filter = aprs_filter or '/'.join(['p', self.user])
193
194
            self.full_auth = ' '.join([self._auth, 'filter', aprs_filter])
195
196
            self.server = server
197
            self.port = port
198
            self.aprsis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
199
            self.aprsis_sock.connect((server, port))
200
            self.logger.info('Connected to server=%s port=%s', server, port)
201
            self.logger.debug('Sending full_auth=%s', self.full_auth)
202
            self.aprsis_sock.sendall((self.full_auth + '\n\r').encode('ascii'))
203
204
    def close(self):
205
        if self.aprsis_sock:
206
            self.aprsis_sock.close()
207
            self.__reset_buffer()
208
            self.aprsis_sock = None
209
210
    def write(self, frame_decoded, headers=None, protocol='TCP'):
211
        """
212
        Sends message to APRS-IS.
213
214
        :param message: Message to send to APRS-IS.
215
        :param headers: Optional HTTP headers to post.
216
        :param protocol: Protocol to use: One of TCP, HTTP or UDP.
217
        :type message: str
218
        :type headers: dict
219
220
        :return: True on success, False otherwise.
221
        :rtype: bool
222
        """
223
224
        frame = aprs_util.encode_frame(frame_decoded)
225
        if 'TCP' in protocol:
226
            self.aprsis_sock.sendall(frame)
227
            return True
228
        elif 'HTTP' in protocol:
229
            content = '\n'.join([self._auth, frame])
230
            headers = headers or aprs_constants.APRSIS_HTTP_HEADERS
231
            result = requests.post(self._url, data=content, headers=headers)
232
            return 204 == result.status_code
233
        elif 'UDP' in protocol:
234
            content = '\n'.join([self._auth, frame])
235
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
236
            sock.sendto(
237
                content,
238
                (aprs_constants.APRSIS_SERVER, aprs_constants.APRSIS_RX_PORT)
239
            )
240
            return True
241
242
    def read(self, filter_logresp=True):
243
        """
244
        Receives from APRS-IS.
245
246
        :param callback: Optional callback to deliver data to.
247
        :type callback: func
248
        """
249
        # check if there is any data waiting
250
        read_more = True
251
        while read_more:
252
            selected = select.select([self.aprsis_sock], [], [], 0)
253
            if len(selected[0]) > 0:
254
                recvd_data = self.aprsis_sock.recv(aprs_constants.RECV_BUFFER)
255
                if not recvd_data:
256
                    self.data_buffer += recvd_data
257
                else:
258
                    read_more = False
259
            else:
260
                read_more = False
261
262
        # check for any complete packets and move them to the packet buffer
263
        if '\r\n' in self.data_buffer:
264
            partial = True
265
            if self.data_buffer.endswith('\r\n'):
266
                partial = False
267
            packets = recvd_data.split('\r\n')
268
            if partial:
269
                self.data_buffer = str(packets.pop(-1))
270
            else:
271
                self.data_buffer = ''
272
            for packet in packets:
273
                self.packet_buffer += [str(packet)]
274
275
        # return the next packet that matches the filter
276
        while len(self.packet_buffer):
277
            packet = self.packet_buffer.pop(0)
278
            if filter_logresp and packet.startswith('#') and 'logresp' in packet:
279
                pass
280
            else:
281
                return aprs_util.decode_frame(packet)
282
283
        return None
284