Completed
Pull Request — master (#13)
by Jeffrey
03:21
created

Aprs.__encode_callsign()   B

Complexity

Conditions 4

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
c 1
b 0
f 0
dl 0
loc 26
rs 8.5806
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""APRS KISS Class Definitions"""
5
6
# These imports are for python3 compatability inside python2
7
from __future__ import absolute_import
8
from __future__ import division
9
from __future__ import print_function
10
11
import logging
12
13
import apex.kiss
14
15
__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
16
__maintainer__ = 'Jeffrey Phillips Freeman (WI2ARD)'
17
__email__ = '[email protected]'
18
__license__ = 'Apache License, Version 2.0'
19
__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
20
__credits__ = []
21
22
23
class Aprs(object):
24
25
    """APRS interface."""
26
27
    def __init__(self, data_stream):
28
        self.data_Stream = data_stream
29
30
    @staticmethod
31
    def __decode_frame(raw_frame):
32
        """
33
        Decodes a KISS-encoded APRS frame.
34
35
        :param raw_frame: KISS-encoded frame to decode.
36
        :type raw_frame: str
37
38
        :return: APRS frame-as-dict.
39
        :rtype: dict
40
        """
41
        logging.debug('raw_frame=%s', raw_frame)
42
        frame = {}
43
        frame_len = len(raw_frame)
44
45
        if frame_len > 16:
46
            for raw_slice in range(0, frame_len):
47
                # Is address field length correct?
48
                if raw_frame[raw_slice] & 0x01 and ((raw_slice + 1) % 7) == 0:
49
                    i = (raw_slice + 1) / 7
50
                    # Less than 2 callsigns?
51
                    if 1 < i < 11:
52
                        if (raw_frame[raw_slice + 1] & 0x03 == 0x03 and raw_frame[raw_slice + 2] in [0xf0, 0xcf]):
53
                            frame['text'] = ''.join(map(chr, raw_frame[raw_slice + 3:]))
54
                            frame['destination'] = Aprs.__identity_as_string(Aprs.__extract_callsign(raw_frame))
55
                            frame['source'] = Aprs.__identity_as_string(Aprs.__extract_callsign(raw_frame[7:]))
56
                            frame['path'] = Aprs.__extract_path(int(i), raw_frame)
57
                            return frame
58
59
        logging.debug('frame=%s', frame)
60
        return frame
61
62
    @staticmethod
63
    def __extract_path(start, raw_frame):
64
        """Extracts path from raw APRS KISS frame.
65
66
        :param start:
67
        :param raw_frame: Raw APRS frame from a KISS device.
68
69
        :return: Full path from APRS frame.
70
        :rtype: list
71
        """
72
        full_path = []
73
74
        for i in range(2, start):
75
            path = Aprs.__identity_as_string(Aprs.__extract_callsign(raw_frame[i * 7:]))
76
            if path:
77
                if raw_frame[i * 7 + 6] & 0x80:
78
                    full_path.append(''.join([path, '*']))
79
                else:
80
                    full_path.append(path)
81
82
        return full_path
83
84
    @staticmethod
85
    def __extract_callsign(raw_frame):
86
        """
87
        Extracts callsign from a raw KISS frame.
88
89
        :param raw_frame: Raw KISS Frame to decode.
90
        :returns: Dict of callsign and ssid.
91
        :rtype: dict
92
        """
93
        callsign = ''.join([chr(x >> 1) for x in raw_frame[:6]]).strip()
94
        ssid = ((raw_frame[6]) >> 1) & 0x0f
95
        return {'callsign': callsign, 'ssid': ssid}
96
97
    @staticmethod
98
    def __identity_as_string(identity):
99
        """
100
        Returns a fully-formatted callsign (Callsign + SSID).
101
102
        :param identity: Callsign Dictionary {'callsign': '', 'ssid': n}
103
        :type callsign: dict
104
        :returns: Callsign[-SSID].
105
        :rtype: str
106
        """
107
        if identity['ssid'] > 0:
108
            return '-'.join([identity['callsign'], str(identity['ssid'])])
109
        else:
110
            return identity['callsign']
111
112
    @staticmethod
113
    def __encode_frame(frame):
114
        """
115
        Encodes an APRS frame-as-dict as a KISS frame.
116
117
        :param frame: APRS frame-as-dict to encode.
118
        :type frame: dict
119
120
        :return: KISS-encoded APRS frame.
121
        :rtype: list
122
        """
123
        enc_frame = Aprs.__encode_callsign(Aprs.__parse_identity_string(frame['destination'])) + \
124
            Aprs.__encode_callsign(Aprs.__parse_identity_string(frame['source']))
125
        for p in frame['path']:
126
            enc_frame += Aprs.__encode_callsign(Aprs.__parse_identity_string(p))
127
128
        return enc_frame[:-1] + [enc_frame[-1] | 0x01] + [apex.kiss.constants.SLOT_TIME] + [0xf0] + frame['text']
129
130
    @staticmethod
131
    def __encode_callsign(callsign):
132
        """
133
        Encodes a callsign-dict within a KISS frame.
134
135
        :param callsign: Callsign-dict to encode.
136
        :type callsign: dict
137
138
        :return: KISS-encoded callsign.
139
        :rtype: list
140
        """
141
        call_sign = callsign['callsign']
142
143
        enc_ssid = (callsign['ssid'] << 1) | 0x60
144
145
        if '*' in call_sign:
146
            call_sign = call_sign.replace('*', '')
147
            enc_ssid |= 0x80
148
149
        while len(call_sign) < 6:
150
            call_sign = ''.join([call_sign, ' '])
151
152
        encoded = []
153
        for p in call_sign:
154
            encoded += [ord(p) << 1]
155
        return encoded + [enc_ssid]
156
157
    @staticmethod
158
    def __parse_identity_string(identity_string):
159
        """
160
        Creates callsign-as-dict from callsign-as-string.
161
162
        :param identity_string: Callsign-as-string (with or without ssid).
163
        :type raw_callsign: str
164
165
        :return: Callsign-as-dict.
166
        :rtype: dict
167
        """
168
        # If we are parsing a spent token then first lets get rid of the astresick suffix.
169
        if identity_string.endswith('*'):
170
            identity_string = identity_string[:-1]
171
172
        if '-' in identity_string:
173
            call_sign, ssid = identity_string.split('-')
174
        else:
175
            call_sign = identity_string
176
            ssid = 0
177
        return {'callsign': call_sign, 'ssid': int(ssid)}
178
179
    def write(self, frame, port=0):
180
        """Writes APRS-encoded frame to KISS device.
181
182
        :param frame: APRS frame to write to KISS device.
183
        :type frame: dict
184
        """
185
        encoded_frame = Aprs.__encode_frame(frame)
186
        self.data_Stream.write(encoded_frame, port)
187
188
    def read(self):
189
        """Reads APRS-encoded frame from KISS device.
190
        """
191
        frame = self.data_Stream.read()
192
        if frame is not None and len(frame):
193
            return Aprs.__decode_frame(frame)
194
        else:
195
            return None
196