Completed
Pull Request — master (#2)
by Jeffrey
04:10
created

Kiss.__init__()   B

Complexity

Conditions 4

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 28
rs 8.5806
cc 4
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""KISS Core Classes."""
5
6
# These imports are for python3 compatability inside python2
7
from __future__ import absolute_import
8
from __future__ import division
9
from __future__ import print_function
10
from __future__ import unicode_literals
11
12
import logging
13
import socket
14
import serial
15
16
from apex.kiss import constants as kissConstants
17
18
__author__ = 'Jeffrey Phillips Freeman (WI2ARD)'
19
__maintainer__ = "Jeffrey Phillips Freeman (WI2ARD)"
20
__email__ = "[email protected]"
21
__license__ = 'Apache License, Version 2.0'
22
__copyright__ = 'Copyright 2016, Syncleus, Inc. and contributors'
23
__credits__ = []
24
25
26
class Kiss(object):
27
28
    """KISS Object Class."""
29
30
    logger = logging.getLogger(__name__)
31
    logger.setLevel(kissConstants.LOG_LEVEL)
32
    console_handler = logging.StreamHandler()
33
    console_handler.setLevel(kissConstants.LOG_LEVEL)
34
    formatter = logging.Formatter(kissConstants.LOG_FORMAT)
35
    console_handler.setFormatter(formatter)
36
    logger.addHandler(console_handler)
37
    logger.propagate = False
38
39
    frame_buffer = []
40
41
    def __init__(self, com_port=None,
42
                 baud=38400,
43
                 parity=serial.PARITY_NONE,
44
                 stop_bits=serial.STOPBITS_ONE,
45
                 byte_size=serial.EIGHTBITS,
46
                 host=None,
47
                 tcp_port=8000,
48
                 strip_df_start=True):
49
        self.com_port = com_port
50
        self.baud = baud
51
        self.parity = parity
52
        self.stop_bits = stop_bits
53
        self.byte_size = byte_size
54
        self.host = host
55
        self.tcp_port = tcp_port
56
        self.interface = None
57
        self.interface_mode = None
58
        self.strip_df_start = strip_df_start
59
        self.exit_kiss = False
60
61
        if self.com_port is not None:
62
            self.interface_mode = 'serial'
63
        elif self.host is not None:
64
            self.interface_mode = 'tcp'
65
        if self.interface_mode is None:
66
            raise Exception('Must set port/speed or host/tcp_port.')
67
68
        self.logger.info('Using interface_mode=%s', self.interface_mode)
69
70
    def __enter__(self):
71
        return self
72
73
    def __exit__(self, exc_type, exc_val, exc_tb):
74
        if 'tcp' in self.interface_mode:
75
            self.interface.shutdown()  # pylint: disable=some-message,another-one
76
        elif self.interface and self.interface.isOpen():
77
            self.interface.close()
78
79
    def __del__(self):
80
        if self.interface and self.interface.isOpen():
81
            self.interface.close()
82
83
    def __read_interface(self):
84
        if 'tcp' in self.interface_mode:
85
            return self.interface.recv(kissConstants.READ_BYTES)
86
        elif 'serial' in self.interface_mode:
87
            read_data = self.interface.read(kissConstants.READ_BYTES)
88
            waiting_data = self.interface.inWaiting()
89
            if waiting_data:
90
                read_data += self.interface.read(waiting_data)
91
            return read_data
92
93
    @staticmethod
94
    def __strip_df_start(frame):
95
        """
96
        Strips KISS DATA_FRAME start (0x00) and newline from frame.
97
98
        :param frame: APRS/AX.25 frame.
99
        :type frame: str
100
        :returns: APRS/AX.25 frame sans DATA_FRAME start (0x00).
101
        :rtype: str
102
        """
103
        while frame[0] is kissConstants.DATA_FRAME:
104
            del frame[0]
105
        while chr(frame[0]).isspace():
106
            del frame[0]
107
        while chr(frame[-1]).isspace():
108
            del frame[-1]
109
        return frame
110
111
    @staticmethod
112
    def __escape_special_codes(raw_code_bytes):
113
        """
114
        Escape special codes, per KISS spec.
115
116
        "If the FEND or FESC codes appear in the data to be transferred, they
117
        need to be escaped. The FEND code is then sent as FESC, TFEND and the
118
        FESC is then sent as FESC, TFESC."
119
        - http://en.wikipedia.org/wiki/KISS_(TNC)#Description
120
        """
121
        encoded_bytes = []
122
        for raw_code_byte in raw_code_bytes:
123
            if raw_code_byte is kissConstants.FESC:
124
                encoded_bytes += kissConstants.FESC_TFESC
125
            elif raw_code_byte is kissConstants.FEND:
126
                encoded_bytes += kissConstants.FESC_TFEND
127
            else:
128
                encoded_bytes += [raw_code_byte]
129
        return encoded_bytes
130
131
    @staticmethod
132
    def __command_byte_combine(port, command_code):
133
        """
134
        Constructs the command byte for the tnc which includes the tnc port and command code.
135
        :param port: integer from 0 to 127 indicating the TNC port
136
        :type port: int
137
        :param command_code: A command code constant, a value from 0 to 127
138
        :type command_code: int
139
        :return: An integer combining the two values into a single byte
140
        """
141
        if port > 127 or port < 0:
142
            raise Exception("port out of range")
143
        elif command_code > 127 or command_code < 0:
144
            raise Exception("command_Code out of range")
145
        return (port << 4) & command_code
146
147
    def start(self, mode_init=None, **kwargs):
148
        """
149
        Initializes the KISS device and commits configuration.
150
151
        See http://en.wikipedia.org/wiki/KISS_(TNC)#Command_codes
152
        for configuration names.
153
154
        :param **kwargs: name/value pairs to use as initial config values.
155
        """
156
        self.logger.debug("kwargs=%s", kwargs)
157
158
        if 'tcp' in self.interface_mode:
159
            address = (self.host, self.tcp_port)
160
            self.interface = socket.create_connection(address)
161
        elif 'serial' in self.interface_mode:
162
            self.interface = serial.Serial(port=self.com_port, baudrate=self.baud, parity=self.parity,
163
                                           stopbits=self.stop_bits, bytesize=self.byte_size)
164
            self.interface.timeout = kissConstants.SERIAL_TIMEOUT
165
            if mode_init is not None:
166
                self.interface.write(mode_init)
167
                self.exit_kiss = True
168
169
        # Previous verious defaulted to Xastir-friendly configs. Unfortunately
170
        # those don't work with Bluetooth TNCs, so we're reverting to None.
171
        if 'serial' in self.interface_mode and kwargs:
172
            for name, value in kwargs.items():
173
                self.write_setting(name, value)
174
175
        # If no settings specified, default to config values similar
176
        # to those that ship with Xastir.
177
        # if not kwargs:
178
        #    kwargs = kiss.constants.DEFAULT_KISS_CONFIG_VALUES
179
180
    def close(self):
181
        if self.exit_kiss is True:
182
            self.interface.write(kissConstants.MODE_END)
183
184
    def write_setting(self, name, value):
185
        """
186
        Writes KISS Command Codes to attached device.
187
188
        http://en.wikipedia.org/wiki/KISS_(TNC)#Command_Codes
189
190
        :param name: KISS Command Code Name as a string.
191
        :param value: KISS Command Code Value to write.
192
        """
193
        self.logger.debug('Configuring %s = %s', name, repr(value))
194
195
        # Do the reasonable thing if a user passes an int
196
        if isinstance(value, int):
197
            value = chr(value)
198
199
        return self.interface.write(
200
            kissConstants.FEND +
201
            getattr(kissConstants, name.upper()) +
202
            Kiss.__escape_special_codes(value) +
203
            kissConstants.FEND
204
        )
205
206
    def fill_buffer(self):
207
        """
208
        Reads any pending data in the interface and stores it in the frame_buffer
209
        """
210
211
        new_frames = []
212
        read_buffer = []
213
        read_data = self.__read_interface()
214
        while read_data is not None and len(read_data):
215
            split_data = [[]]
216
            for read_byte in read_data:
217
                if read_byte is kissConstants.FEND:
218
                    split_data.append([])
219
                else:
220
                    split_data[-1].append(read_byte)
221
            len_fend = len(split_data)
222
223
            # No FEND in frame
224
            if len_fend == 1:
225
                read_buffer += split_data[0]
226
            # Single FEND in frame
227
            elif len_fend == 2:
228
                # Closing FEND found
229
                if split_data[0]:
230
                    # Partial frame continued, otherwise drop
231
                    new_frames.append(read_buffer + split_data[0])
232
                    read_buffer = []
233
                # Opening FEND found
234
                else:
235
                    new_frames.append(read_buffer)
236
                    read_buffer = split_data[1]
237
            # At least one complete frame received
238
            elif len_fend >= 3:
239
                for i in range(0, len_fend - 1):
240
                    read_buffer_tmp = read_buffer + split_data[i]
241
                    if len(read_buffer_tmp) is not 0:
242
                        new_frames.append(read_buffer_tmp)
243
                        read_buffer = []
244
                if split_data[len_fend - 1]:
245
                    read_buffer = split_data[len_fend - 1]
246
            # Get anymore data that is waiting
247
            read_data = self.__read_interface()
248
249
        for new_frame in new_frames:
250
            if len(new_frame) and new_frame[0] == 0:
251
                if self.strip_df_start:
252
                    new_frame = Kiss.__strip_df_start(new_frame)
253
                self.frame_buffer.append(new_frame)
254
255
    def read(self):
256
        if not len(self.frame_buffer):
257
            self.fill_buffer()
258
259
        if len(self.frame_buffer):
260
            return_frame = self.frame_buffer[0]
261
            del self.frame_buffer[0]
262
            return return_frame
263
        else:
264
            return None
265
266
    def write(self, frame_bytes, port=0):
267
        """
268
        Writes frame to KISS interface.
269
270
        :param frame: Frame to write.
271
        """
272
        kiss_packet = [kissConstants.FEND] + [Kiss.__command_byte_combine(port, kissConstants.DATA_FRAME)] +\
273
            Kiss.__escape_special_codes(frame_bytes) + [kissConstants.FEND]
274
275
        if 'tcp' in self.interface_mode:
276
            return self.interface.send(bytearray(kiss_packet))
277
        elif 'serial' in self.interface_mode:
278
            return self.interface.write(kiss_packet)
279