Completed
Push — master ( 749c22...2aa77e )
by Jeffrey
03:14
created

AprsKiss   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 203
Duplicated Lines 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
dl 0
loc 203
rs 8.3396
c 3
b 0
f 0
wmc 44

13 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 3 1
D __decode_frame() 0 31 8
A connect() 0 2 1
C __valid_frame() 0 18 8
A __encode_frame() 0 18 3
B __encode_callsign() 0 26 4
A __identity_as_string() 0 14 2
A __parse_identity_string() 0 21 3
A write() 0 10 3
A __extract_path() 0 21 4
A __extract_callsign() 0 12 2
A read() 0 9 4
A close() 0 2 1

How to fix   Complexity   

Complex Class

Complex classes like AprsKiss often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""APRS KISS Class Definitions"""
5
6
# These imports are for python3 compatibility inside python2
7
from __future__ import absolute_import
8
from __future__ import division
9
from __future__ import print_function
10
11
import logging
12
import threading
13
14
import apex.kiss
15
16
__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
17
__maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)'
18
__email__ = '[email protected]'
19
__license__ = 'Apache License, Version 2.0'
20
__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
21
__credits__ = []
22
23
24
class AprsKiss(object):
25
26
    """APRS interface."""
27
28
    def __init__(self, data_stream):
29
        self.data_stream = data_stream
30
        self.lock = threading.Lock()
31
32
    @staticmethod
33
    def __decode_frame(raw_frame):
34
        """
35
        Decodes a KISS-encoded APRS frame.
36
37
        :param raw_frame: KISS-encoded frame to decode.
38
        :type raw_frame: str
39
40
        :return: APRS frame-as-dict.
41
        :rtype: dict
42
        """
43
        logging.debug('raw_frame=%s', raw_frame)
44
        frame = {}
45
        frame_len = len(raw_frame)
46
47
        if frame_len > 16:
48
            for raw_slice in range(0, frame_len - 2):
49
                # Is address field length correct?
50
                if raw_frame[raw_slice] & 0x01 and ((raw_slice + 1) % 7) == 0:
51
                    i = (raw_slice + 1) / 7
52
                    # Less than 2 callsigns?
53
                    if 1 < i < 11:
54
                        if raw_frame[raw_slice + 1] & 0x03 is 0x03 and raw_frame[raw_slice + 2] in [0xf0, 0xcf]:
55
                            frame['text'] = ''.join(map(chr, raw_frame[raw_slice + 3:]))
56
                            frame['destination'] = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame))
57
                            frame['source'] = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame[7:]))
58
                            frame['path'] = AprsKiss.__extract_path(int(i), raw_frame)
59
                            return frame
60
61
        logging.debug('frame=%s', frame)
62
        return frame
63
64
    @staticmethod
65
    def __valid_frame(raw_frame):
66
        logging.debug('raw_frame=%s', raw_frame)
67
        frame = {}
68
        frame_len = len(raw_frame)
69
70
        if frame_len > 16:
71
            for raw_slice in range(0, frame_len - 2):
72
                # Is address field length correct?
73
                if raw_frame[raw_slice] & 0x01 and ((raw_slice + 1) % 7) == 0:
74
                    i = (raw_slice + 1) / 7
75
                    # Less than 2 callsigns?
76
                    if 1 < i < 11:
77
                        if raw_frame[raw_slice + 1] & 0x03 is 0x03 and raw_frame[raw_slice + 2] in [0xf0, 0xcf]:
78
                            return True
79
80
        logging.debug('frame=%s', frame)
81
        return False
82
83
    @staticmethod
84
    def __extract_path(start, raw_frame):
85
        """Extracts path from raw APRS KISS frame.
86
87
        :param start:
88
        :param raw_frame: Raw APRS frame from a KISS device.
89
90
        :return: Full path from APRS frame.
91
        :rtype: list
92
        """
93
        full_path = []
94
95
        for i in range(2, start):
96
            path = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame[i * 7:]))
97
            if path:
98
                if raw_frame[i * 7 + 6] & 0x80:
99
                    full_path.append(''.join([path, '*']))
100
                else:
101
                    full_path.append(path)
102
103
        return full_path
104
105
    @staticmethod
106
    def __extract_callsign(raw_frame):
107
        """
108
        Extracts callsign from a raw KISS frame.
109
110
        :param raw_frame: Raw KISS Frame to decode.
111
        :returns: Dict of callsign and ssid.
112
        :rtype: dict
113
        """
114
        callsign = ''.join([chr(x >> 1) for x in raw_frame[:6]]).strip()
115
        ssid = ((raw_frame[6]) >> 1) & 0x0f
116
        return {'callsign': callsign, 'ssid': ssid}
117
118
    @staticmethod
119
    def __identity_as_string(identity):
120
        """
121
        Returns a fully-formatted callsign (Callsign + SSID).
122
123
        :param identity: Callsign Dictionary {'callsign': '', 'ssid': n}
124
        :type callsign: dict
125
        :returns: Callsign[-SSID].
126
        :rtype: str
127
        """
128
        if identity['ssid'] > 0:
129
            return '-'.join([identity['callsign'], str(identity['ssid'])])
130
        else:
131
            return identity['callsign']
132
133
    @staticmethod
134
    def __encode_frame(frame):
135
        """
136
        Encodes an APRS frame-as-dict as a KISS frame.
137
138
        :param frame: APRS frame-as-dict to encode.
139
        :type frame: dict
140
141
        :return: KISS-encoded APRS frame.
142
        :rtype: list
143
        """
144
        enc_frame = AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(frame['destination'])) + \
145
            AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(frame['source']))
146
        for p in frame['path']:
147
            enc_frame += AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(p))
148
149
        return enc_frame[:-1] + [enc_frame[-1] | 0x01] + [apex.kiss.constants.SLOT_TIME] + [0xf0]\
150
            + [ord(c) for c in frame['text']]
151
152
    @staticmethod
153
    def __encode_callsign(callsign):
154
        """
155
        Encodes a callsign-dict within a KISS frame.
156
157
        :param callsign: Callsign-dict to encode.
158
        :type callsign: dict
159
160
        :return: KISS-encoded callsign.
161
        :rtype: list
162
        """
163
        call_sign = callsign['callsign']
164
165
        enc_ssid = (callsign['ssid'] << 1) | 0x60
166
167
        if '*' in call_sign:
168
            call_sign = call_sign.replace('*', '')
169
            enc_ssid |= 0x80
170
171
        while len(call_sign) < 6:
172
            call_sign = ''.join([call_sign, ' '])
173
174
        encoded = []
175
        for p in call_sign:
176
            encoded += [ord(p) << 1]
177
        return encoded + [enc_ssid]
178
179
    @staticmethod
180
    def __parse_identity_string(identity_string):
181
        """
182
        Creates callsign-as-dict from callsign-as-string.
183
184
        :param identity_string: Callsign-as-string (with or without ssid).
185
        :type raw_callsign: str
186
187
        :return: Callsign-as-dict.
188
        :rtype: dict
189
        """
190
        # If we are parsing a spent token then first lets get rid of the astresick suffix.
191
        if identity_string.endswith('*'):
192
            identity_string = identity_string[:-1]
193
194
        if '-' in identity_string:
195
            call_sign, ssid = identity_string.split('-')
196
        else:
197
            call_sign = identity_string
198
            ssid = 0
199
        return {'callsign': call_sign, 'ssid': int(ssid)}
200
201
    def connect(self, *args, **kwargs):
202
        self.data_stream.connect(*args, **kwargs)
203
204
    def close(self, *args, **kwargs):
205
        self.data_stream.close(*args, **kwargs)
206
207
    def write(self, frame, *args, **kwargs):
208
        """Writes APRS-encoded frame to KISS device.
209
210
        :param frame: APRS frame to write to KISS device.
211
        :type frame: dict
212
        """
213
        with self.lock:
214
            encoded_frame = AprsKiss.__encode_frame(frame)
215
            if AprsKiss.__valid_frame(encoded_frame):
216
                self.data_stream.write(encoded_frame, *args, **kwargs)
217
218
    def read(self, *args, **kwargs):
219
        """Reads APRS-encoded frame from KISS device.
220
        """
221
        with self.lock:
222
            frame = self.data_stream.read(*args, **kwargs)
223
            if frame is not None and len(frame):
224
                return AprsKiss.__decode_frame(frame)
225
            else:
226
                return None
227