OBDScanner   B
last analyzed

Complexity

Total Complexity 51

Size/Duplication

Total Lines 294
Duplicated Lines 0 %

Importance

Changes 19
Bugs 0 Features 1
Metric Value
wmc 51
c 19
b 0
f 1
dl 0
loc 294
rs 8.3206

25 Methods

Rating   Name   Duplication   Size   Complexity  
A connect() 0 11 2
A set_protocol() 0 6 2
A check_spaces_off() 0 4 2
A battery_voltage() 0 6 1
B collect_data() 0 25 6
A receive() 0 15 3
A header_on() 0 6 2
A __init__() 0 18 1
A echo_off() 0 6 1
A __exit__() 0 2 1
A get_proto_num() 0 9 4
A _check_response() 0 5 1
A check_echo_off() 0 5 2
A is_port() 0 5 1
B get_basic_info() 0 29 4
A send() 0 16 3
A _write() 0 11 1
A check_feed_off() 0 4 2
B initialize() 0 27 1
A reset() 0 7 2
A vehicle_id_number() 0 10 2
A clear_trouble_codes() 0 10 2
A check_connection() 0 5 2
A disconnect() 0 10 2
A __enter__() 0 7 1

How to fix   Complexity   

Complex Class

Complex classes like OBDScanner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import time
2
import obdlib.uart as uart
3
import obdlib.elm327 as elm327
4
from obdlib.obd import commands
5
from obdlib.obd import sensors
6
from obdlib.response import Response
7
from obdlib.logging import logger
8
9
10
class OBDScanner(object):
11
    """
12
        ELM327 OBD-II Scanner
13
14
        Information about OBD-II PIDs
15
        http://en.wikipedia.org/wiki/OBD-II_PIDs
16
17
        Additional details about EML327 OBD <-> RS232 found here:
18
        http://elmelectronics.com/DSheets/ELM327DS.pdf
19
    """
20
21
    def __init__(self, pb_str, baud=uart.DEFAULT_BAUDRATE, units=0):
22
        """
23
            Init params
24
            :param pb_str: port or bus number|name
25
            :param baud: it is the clock rate
26
            :param units: default units for system readings
27
            (0 - Europe, 1 - English)
28
        """
29
        self.pb_str = pb_str
30
        self.baud = baud
31
        self.uart_port = None
32
        self.elm_version = ""
33
        self.obd_protocol = ""
34
        self.units = units
35
        self.success = "OK"
36
        self.sensor = None
37
        # it does prove that the connection with a vehicle is working
38
        self.__connected = False
39
40
    def connect(self):
41
        """
42
            Opens a connection to an ELM327 OBD-II Interface
43
            :return:
44
        """
45
        self.uart_port = uart.UART().connection(
46
            self.pb_str,
47
            baudrate=self.baud)
48
49
        if self.is_port():
50
            self.initialize()
51
52
    def is_port(self):
53
        """ Returns a boolean for whether a successful connection
54
            with port was made
55
        """
56
        return self.uart_port is not None
57
58
    def __enter__(self):
59
        """
60
            Sets up the OBDScanner to work as a ContextManager
61
            :return: this OBDScanner instance for use within the context
62
        """
63
        self.connect()
64
        return self
65
66
    def __exit__(self, exc_type, exc_val, exc_tb):
67
        self.disconnect()
68
69
    def battery_voltage(self):
70
        """
71
            Reads the vehicle's battery voltage from a connected OBD-II Scanner
72
            :return: the battery voltage returned by the OBD-II Scanner
73
        """
74
        return self.send(elm327.BATTERY_VOLTAGE_COMMAND).at_value
75
76
    def disconnect(self):
77
        """
78
            Disconnect from a connected OBD-II Scanner
79
            :return:
80
        """
81
        if self.is_port():
82
            self.reset()
83
            self.uart_port.close()
84
        self.__connected = None
85
        self.elm_version = ""
86
87
    def initialize(self):
88
        """
89
            Initialize the OBD-II Scanner state after connecting
90
            :return:
91
        """
92
        self.reset()
93
        self.check_echo_off()
94
        self.check_spaces_off()
95
        self.check_feed_off()
96
97
        # Disable memory function
98
        self.send(elm327.MEMORY_OFF_COMMAND)
99
100
        self.set_protocol()
101
        self.header_on()
102
103
        self.sensor = sensors.Command(self.send, self.units)
104
        # checks connection with vehicle
105
        self.__connected = self.sensor.check_pids()
106
107
        self.check_connection()
108
109
        self.obd_protocol = self.send(
110
            elm327.DESCRIBE_PROTOCOL_NUMBER_COMMAND).at_value
111
112
        # gets available pids
113
        self.sensor.check_pids()
114
115
    def check_connection(self):
116
        if not self.__connected:
117
            mess = "Failed connection to the OBD2 interface!"
118
            logger.error(mess)
119
            raise Exception(mess)
120
121
    def header_on(self):
122
        if not self._check_response(
123
                self.send(elm327.HEADER_ON_COMMAND).raw_value):
124
            mess = "Enable header command did not completed"
125
            logger.error(mess)
126
            raise Exception(mess)
127
128
    def set_protocol(self):
129
        if not self._check_response(
130
                self.send(elm327.SET_PROTOCOL_AUTO_COMMAND).raw_value):
131
            mess = "Set protocol command did not completed"
132
            logger.error(mess)
133
            raise Exception(mess)
134
135
    def check_echo_off(self):
136
        if not self._check_response(self.echo_off()):
137
            mess = "Echo command did not completed"
138
            logger.error(mess)
139
            raise Exception(mess)
140
141
    def check_spaces_off(self):
142
        if not self._check_response(
143
                self.send(elm327.SPACES_OFF_COMMAND).raw_value):
144
            logger.warning("Spaces off command did not completed")
145
146
    def check_feed_off(self):
147
        if not self._check_response(
148
                self.send(elm327.LINEFEED_OFF_COMMAND).raw_value):
149
            logger.warning("Line feed off command did not completed")
150
151
    def get_proto_num(self):
152
        """
153
            Retrieves the protocol number (response format is - A4)
154
        """
155
        number = self.obd_protocol
156
        if len(number) == 2 and 'A' in number:
157
            number = int(number[-1], 16)
158
159
        return number if number else 0
160
161
    def receive(self):
162
        """
163
            Receive data from connected OBD-II Scanner
164
            :return: the data returned by the OBD-II Scanner
165
        """
166
        if self.is_port():
167
            value = self.collect_data()
168
            if value:
169
                return Response(value, self.get_proto_num())
170
        else:
171
            mess = "Cannot read when unconnected"
172
            logger.error(mess)
173
            raise Exception(mess)
174
175
        return Response()
176
177
    def collect_data(self):
178
        """
179
            Listens an UART port and
180
            retrieves data from one
181
        """
182
        retry_number = 0
183
        value = b''
184
        while True:
185
            data = self.uart_port.read(1)
186
187
            if data == b'>':
188
                break
189
190
            # ignore incoming bytes that are of value 00 (NULL)
191
            if data == b'\x00':
192
                continue
193
194
            if len(data) == 0:
195
                if retry_number >= elm327.DEFAULT_RETRIES:
196
                    break
197
                retry_number += 1
198
                continue
199
200
            value += data
201
        return value
202
203
    def reset(self):
204
        """
205
            Reset the OBD-II Scanner
206
            :return:
207
        """
208
        if self.is_port():
209
            self.elm_version = self.send(elm327.RESET_COMMAND, 1).at_value
210
211
    def send(self, data, wait=None):
212
        """
213
            Send data/command to the connected OBD-II Scanner
214
            :param data: the data/command to send to the connected OBD-II
215
            scanner
216
            :param wait: the delay between write and read, in sec
217
            :return the data returned by the OBD-II Scanner
218
        """
219
        if self.is_port():
220
            self._write(data)
221
222
        # Wait for data to become available
223
        if wait:
224
            time.sleep(wait)
225
226
        return self.receive()
227
228
    def vehicle_id_number(self):
229
        """
230
            Returns the vehicle's identification number (VIN)
231
            :return dict {ecu: number}
232
        """
233
        sensor = self.sensor[9]('02')
234
        vin = {}
235
        for ecu, value in sensor.ecus:
236
            vin[ecu] = value
237
        return vin
238
239
    def get_basic_info(self):
240
        """
241
            Returns general vehicle's information
242
            :return dict
243
        """
244
        gen_info = {}
245
246
        # complies with OBD std
247
        sensor = self.sensor[1]('1C')
248
        obd = {}
249
        for ecu, value in sensor.ecus:
250
            obd[ecu] = value
251
        gen_info[sensor.title] = obd
252
253
        # fuel type
254
        sensor = self.sensor[1]('51')
255
        f_type = {}
256
        for ecu, value in sensor.ecus:
257
            f_type[ecu] = value
258
        if f_type:
259
            gen_info[sensor.title] = f_type
260
261
        # get VIN
262
        gen_info['VIN'] = self.vehicle_id_number()
263
264
        # battery voltage
265
        gen_info['BATTERY_VOLTAGE'] = self.battery_voltage()
266
267
        return gen_info
268
269
    def clear_trouble_codes(self):
270
        """
271
            Uses OBD Mode 04 to clear trouble codes and the malfunction
272
            indicator lamp (MIL) / check engine light
273
            :return:
274
        """
275
        if not self._check_response(
276
                self.send(commands.CLEAR_TROUBLE_CODES_COMMAND).raw_value):
277
            # logging warning
278
            logger.warning("Clear trouble codes did not return success")
279
280
    def echo_off(self):
281
        """
282
            Turns ECHO OFF for the OBD-II Scanner
283
            :return response data
284
        """
285
        return self.send(elm327.ECHO_OFF_COMMAND).raw_value
286
287
    def _check_response(self, data):
288
        """
289
            Checks the common command
290
        """
291
        return self.success in data
292
293
    def _write(self, data):
294
        """
295
            Send data/command to the connected OBD-II Scanner
296
            :param data: the data/command to send to the connected OBD-II
297
            scanner
298
            :return:
299
        """
300
        self.uart_port.flushOutput()
301
        self.uart_port.flushInput()
302
        cmd = data + "\r\n"
303
        self.uart_port.write(cmd.encode())
304