Completed
Push — master ( e0a035...ea94b0 )
by Jeffrey
03:08
created

AprsKiss.__encode_callsign()   B

Complexity

Conditions 4

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 26
rs 8.5806
cc 4
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""APRS KISS Class Definitions"""
5
6
# These imports are for python3 compatability inside python2
7
from __future__ import absolute_import
8
from __future__ import division
9
from __future__ import print_function
10
11
import logging
12
import threading
13
14
import apex.kiss
15
16
__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
17
__maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)'
18
__email__ = '[email protected]'
19
__license__ = 'Apache License, Version 2.0'
20
__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
21
__credits__ = []
22
23
24
class AprsKiss(object):
25
26
    """APRS interface."""
27
28
    def __init__(self, data_stream):
29
        self.data_stream = data_stream
30
        self.lock = threading.Lock()
31
32
    @staticmethod
33
    def __decode_frame(raw_frame):
34
        """
35
        Decodes a KISS-encoded APRS frame.
36
37
        :param raw_frame: KISS-encoded frame to decode.
38
        :type raw_frame: str
39
40
        :return: APRS frame-as-dict.
41
        :rtype: dict
42
        """
43
        logging.debug('raw_frame=%s', raw_frame)
44
        frame = {}
45
        frame_len = len(raw_frame)
46
47
        if frame_len > 16:
48
            for raw_slice in range(0, frame_len - 2):
49
                # Is address field length correct?
50
                if raw_frame[raw_slice] & 0x01 and ((raw_slice + 1) % 7) == 0:
51
                    i = (raw_slice + 1) / 7
52
                    # Less than 2 callsigns?
53
                    if 1 < i < 11:
54
                        if raw_frame[raw_slice + 1] & 0x03 is 0x03 and raw_frame[raw_slice + 2] in [0xf0, 0xcf]:
55
                            frame['text'] = ''.join(map(chr, raw_frame[raw_slice + 3:]))
56
                            frame['destination'] = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame))
57
                            frame['source'] = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame[7:]))
58
                            frame['path'] = AprsKiss.__extract_path(int(i), raw_frame)
59
                            return frame
60
61
        logging.debug('frame=%s', frame)
62
        return frame
63
64
    @staticmethod
65
    def __extract_path(start, raw_frame):
66
        """Extracts path from raw APRS KISS frame.
67
68
        :param start:
69
        :param raw_frame: Raw APRS frame from a KISS device.
70
71
        :return: Full path from APRS frame.
72
        :rtype: list
73
        """
74
        full_path = []
75
76
        for i in range(2, start):
77
            path = AprsKiss.__identity_as_string(AprsKiss.__extract_callsign(raw_frame[i * 7:]))
78
            if path:
79
                if raw_frame[i * 7 + 6] & 0x80:
80
                    full_path.append(''.join([path, '*']))
81
                else:
82
                    full_path.append(path)
83
84
        return full_path
85
86
    @staticmethod
87
    def __extract_callsign(raw_frame):
88
        """
89
        Extracts callsign from a raw KISS frame.
90
91
        :param raw_frame: Raw KISS Frame to decode.
92
        :returns: Dict of callsign and ssid.
93
        :rtype: dict
94
        """
95
        callsign = ''.join([chr(x >> 1) for x in raw_frame[:6]]).strip()
96
        ssid = ((raw_frame[6]) >> 1) & 0x0f
97
        return {'callsign': callsign, 'ssid': ssid}
98
99
    @staticmethod
100
    def __identity_as_string(identity):
101
        """
102
        Returns a fully-formatted callsign (Callsign + SSID).
103
104
        :param identity: Callsign Dictionary {'callsign': '', 'ssid': n}
105
        :type callsign: dict
106
        :returns: Callsign[-SSID].
107
        :rtype: str
108
        """
109
        if identity['ssid'] > 0:
110
            return '-'.join([identity['callsign'], str(identity['ssid'])])
111
        else:
112
            return identity['callsign']
113
114
    @staticmethod
115
    def __encode_frame(frame):
116
        """
117
        Encodes an APRS frame-as-dict as a KISS frame.
118
119
        :param frame: APRS frame-as-dict to encode.
120
        :type frame: dict
121
122
        :return: KISS-encoded APRS frame.
123
        :rtype: list
124
        """
125
        enc_frame = AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(frame['destination'])) + \
126
            AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(frame['source']))
127
        for p in frame['path']:
128
            enc_frame += AprsKiss.__encode_callsign(AprsKiss.__parse_identity_string(p))
129
130
        return enc_frame[:-1] + [enc_frame[-1] | 0x01] + [apex.kiss.constants.SLOT_TIME] + [0xf0]\
131
            + [ord(c) for c in frame['text']]
132
133
    @staticmethod
134
    def __encode_callsign(callsign):
135
        """
136
        Encodes a callsign-dict within a KISS frame.
137
138
        :param callsign: Callsign-dict to encode.
139
        :type callsign: dict
140
141
        :return: KISS-encoded callsign.
142
        :rtype: list
143
        """
144
        call_sign = callsign['callsign']
145
146
        enc_ssid = (callsign['ssid'] << 1) | 0x60
147
148
        if '*' in call_sign:
149
            call_sign = call_sign.replace('*', '')
150
            enc_ssid |= 0x80
151
152
        while len(call_sign) < 6:
153
            call_sign = ''.join([call_sign, ' '])
154
155
        encoded = []
156
        for p in call_sign:
157
            encoded += [ord(p) << 1]
158
        return encoded + [enc_ssid]
159
160
    @staticmethod
161
    def __parse_identity_string(identity_string):
162
        """
163
        Creates callsign-as-dict from callsign-as-string.
164
165
        :param identity_string: Callsign-as-string (with or without ssid).
166
        :type raw_callsign: str
167
168
        :return: Callsign-as-dict.
169
        :rtype: dict
170
        """
171
        # If we are parsing a spent token then first lets get rid of the astresick suffix.
172
        if identity_string.endswith('*'):
173
            identity_string = identity_string[:-1]
174
175
        if '-' in identity_string:
176
            call_sign, ssid = identity_string.split('-')
177
        else:
178
            call_sign = identity_string
179
            ssid = 0
180
        return {'callsign': call_sign, 'ssid': int(ssid)}
181
182
    def connect(self, *args, **kwargs):
183
        self.data_stream.connect(*args, **kwargs)
184
185
    def close(self, *args, **kwargs):
186
        self.data_stream.close(*args, **kwargs)
187
188
    def write(self, frame, *args, **kwargs):
189
        """Writes APRS-encoded frame to KISS device.
190
191
        :param frame: APRS frame to write to KISS device.
192
        :type frame: dict
193
        """
194
        with self.lock:
195
            encoded_frame = AprsKiss.__encode_frame(frame)
196
            self.data_stream.write(encoded_frame, *args, **kwargs)
197
198
    def read(self, *args, **kwargs):
199
        """Reads APRS-encoded frame from KISS device.
200
        """
201
        with self.lock:
202
            frame = self.data_stream.read(*args, **kwargs)
203
            if frame is not None and len(frame):
204
                return AprsKiss.__decode_frame(frame)
205
            else:
206
                return None
207