A
last analyzed

Complexity

Total Complexity 30

Size/Duplication

Total Lines 362
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 30
c 0
b 0
f 0
dl 0
loc 362
rs 10

22 Methods

Rating   Name   Duplication   Size   Complexity  
A estOBDScanner.test_disconnect() 0 23 1
A estOBDScanner.pid() 0 7 1
A estOBDScanner.test_connection() 0 11 1
A estOBDScanner.setUp() 0 2 1
A estOBDScanner.read_character() 0 7 1
A estOBDScanner.test_is_port() 0 9 1
A estOBDScanner.test_vehicle_id_number() 0 19 3
A estOBDScanner.send() 0 6 1
A estOBDScanner.test_receive() 0 47 3
A estOBDScanner.test__check_response() 0 6 1
A .gen() 0 4 2
B estOBDScanner.test_get_basic_info() 0 40 5
A estOBDScanner.test_echo_off() 0 14 2
A estOBDScanner.test__write() 0 12 1
A estOBDScanner.mode() 0 10 2
A estOBDScanner.test_get_proto_num() 0 4 1
A .__init__() 0 5 3
A estOBDScanner.test_reset() 0 9 1
B estOBDScanner.test_clear_trouble_codes() 0 29 2
A estOBDScanner.test_battery_voltage() 0 5 1
B estOBDScanner.test_initialize() 0 73 5
B estOBDScanner.test_send() 0 42 1
1
import unittest
2
import sys
3
4
if sys.version_info[0] < 3:
5
    import mock
6
else:
7
    import unittest.mock as mock
8
9
import obdlib.scanner as scanner
10
11
12
class TestOBDScanner(unittest.TestCase):
13
    def setUp(self):
14
        self.scan = scanner.OBDScanner("/dev/null")
15
16
    @mock.patch('obdlib.scanner.OBDScanner.is_port')
17
    @mock.patch('obdlib.scanner.OBDScanner.initialize')
18
    @mock.patch('obdlib.uart.UART.connection')
19
    def test_connection(self, mock_uart, mock_init, mock_is_port):
20
        mock_is_port.return_value = True
21
        self.scan.connect()
22
        self.assertEqual(mock_init.call_count, 1)
23
        self.assertEqual(mock_uart.call_count, 1)
24
        self.assertEqual(mock_init.call_args_list[0][0], ())
25
        self.assertEqual(mock_uart.call_args_list[0][0], ('/dev/null',))
26
        self.assertEqual(mock_uart.call_args_list[0][1], {'baudrate': 38400})
27
28
    def test_is_port(self):
29
        # False
30
        self.scan.uart_port = None
31
        is_port = self.scan.is_port()
32
        self.assertFalse(is_port)
33
        # True
34
        self.scan.uart_port = object
35
        is_port = self.scan.is_port()
36
        self.assertTrue(is_port)
37
38
    @mock.patch('obdlib.scanner.OBDScanner.vehicle_id_number')
39
    @mock.patch('obdlib.scanner.OBDScanner.battery_voltage')
40
    def test_get_basic_info(self, mock_voltage, mock_vin):
41
        mock_vin.return_value = {'E8': '2312KJLKJHLKJ43L34323'}
42
        mock_voltage.return_value = 12.7
43
        self.scan.sensor = mock.MagicMock()
44
45
        class p(object):
46
            def __init__(self, n):
47
                if int(n, 16) == 28:
48
                    self.values = {'E8': 'OBD and OBD-II'}
49
                if int(n, 16) == 81:
50
                    self.values = {'E8': 'Gasoline'}
51
52
            @property
53
            def gen(self):
54
                for ecu, value in self.values.items():
55
                    yield (ecu, value)
56
57
        def mode(a):
58
            def pid(n):
59
                pid.ecus = p(n).gen
60
                if int(n, 16) == 28:
61
                    pid.title = 'OBD'
62
                if int(n, 16) == 81:
63
                    pid.title = 'FUEL_TYPE'
64
                return pid
65
66
            return pid
67
68
        self.scan.sensor.__getitem__.side_effect = mode
69
        info = self.scan.get_basic_info()
70
        self.assertEqual(mock_vin.call_count, 1)
71
        self.assertEqual(mock_vin.call_args_list[0][0], ())
72
        self.assertEqual(info,
73
                         {
74
                             'BATTERY_VOLTAGE': 12.7,
75
                             'OBD': {'E8': 'OBD and OBD-II'},
76
                             'VIN': {'E8': '2312KJLKJHLKJ43L34323'},
77
                             'FUEL_TYPE': {'E8': 'Gasoline'}
78
                         })
79
80
    @mock.patch('obdlib.scanner.OBDScanner.send')
81
    def test_battery_voltage(self, mock_send):
82
        volt = self.scan.battery_voltage()
83
        self.assertEqual(mock_send.call_count, 1)
84
        self.assertEqual(mock_send.call_args_list[0][0], ('ATRV',))
85
86
    @mock.patch('obdlib.scanner.OBDScanner.reset')
87
    @mock.patch('obdlib.scanner.OBDScanner.is_port')
88
    def test_disconnect(self, mock_is_port, mock_reset):
89
        # is_port True
90
        mock_is_port.return_value = True
91
        self.scan.uart_port = mock.Mock()
92
        self.scan.disconnect()
93
        self.assertEqual(mock_is_port.call_count, 1)
94
        self.assertEqual(mock_is_port.call_args_list[0][0], ())
95
        self.assertEqual(mock_reset.call_count, 1)
96
        self.assertEqual(mock_reset.call_args_list[0][0], ())
97
        self.assertEqual(self.scan.uart_port.close.call_count, 1)
98
        self.assertEqual(self.scan.uart_port.close.call_args_list[0][0], ())
99
        # is_port False
100
        mock_is_port.reset_mock()
101
        mock_reset.reset_mock()
102
        self.scan.uart_port.reset_mock()
103
        mock_is_port.return_value = False
104
        self.scan.disconnect()
105
        self.assertEqual(mock_is_port.call_count, 1)
106
        self.assertEqual(mock_is_port.call_args_list[0][0], ())
107
        self.assertEqual(mock_reset.call_count, 0)
108
        self.assertEqual(self.scan.uart_port.close.call_count, 0)
109
110
    @mock.patch('sys.stdout')
111
    @mock.patch('obdlib.scanner.sensors.Command')
112
    @mock.patch('obdlib.scanner.OBDScanner.send')
113
    @mock.patch('obdlib.scanner.OBDScanner.echo_off')
114
    @mock.patch('obdlib.scanner.OBDScanner.reset')
115
    def test_initialize(self, mock_reset, mock_echo, mock_send, mock_sensor, mock_out):
116
        exception = ''
117
118
        def send(data):
119
            class r_v():
120
                raw_value = r_value
121
                at_value = a_value
122
123
            return r_v
124
125
        # if connected, each request returns OK
126
        mock_echo.return_value = ['OK']
127
        r_value = ['OK']
128
        a_value = 'A5'
129
        send.counter = 0
130
        mock_send.side_effect = send
131
        self.scan.initialize()
132
133
        self.assertEqual(mock_reset.call_count, 1)
134
        self.assertEqual(mock_reset.call_args_list[0][0], ())
135
136
        self.assertEqual(mock_echo.call_count, 1)
137
        self.assertEqual(mock_echo.call_args_list[0][0], ())
138
139
        self.assertEqual(mock_send.call_count, 6)
140
        self.assertEqual(mock_sensor.call_args_list[0][0], (mock_send, 0))
141
142
        # if not connected
143
        mock_reset.reset_mock()
144
        mock_echo.reset_mock()
145
        mock_send.reset_mock()
146
        mock_sensor.reset_mock()
147
        mock_echo.return_value = ['OK']
148
        r_value = ['OK']
149
        a_value = 'A5'
150
        self.scan.sensor.check_pids.return_value = False
151
        with self.assertRaises(Exception) as cm:
152
            self.scan.initialize()
153
        self.assertEqual(cm.exception.__str__(), 'Failed connection to the OBD2 interface!')
154
155
        # Exception, check Echo
156
        mock_reset.reset_mock()
157
        mock_echo.reset_mock()
158
        mock_send.reset_mock()
159
        mock_sensor.reset_mock()
160
161
        mock_echo.return_value = ['']
162
        r_value = ['']
163
        a_value = 'A5'
164
        send.counter = 0
165
        mock_send.side_effect = send
166
        with self.assertRaises(Exception) as cm:
167
            self.scan.initialize()
168
        self.assertEqual(cm.exception.__str__(), 'Echo command did not completed')
169
170
        mock_reset.reset_mock()
171
        mock_echo.reset_mock()
172
        mock_send.reset_mock()
173
        mock_sensor.reset_mock()
174
175
        mock_echo.return_value = ['OK']
176
        r_value = ['']
177
        a_value = 'A5'
178
        mock_send.side_effect = send
179
180
        with self.assertRaises(Exception) as cm:
181
            self.scan.initialize()
182
        self.assertEqual(cm.exception.__str__(), "Set protocol command did not completed")
183
184
    def test_get_proto_num(self):
185
        self.scan.obd_protocol = 'AA'
186
        num = self.scan.get_proto_num()
187
        self.assertEqual(num, 10)
188
189
    @mock.patch('obdlib.scanner.OBDScanner.get_proto_num')
190
    @mock.patch('obdlib.scanner.OBDScanner.is_port')
191
    def test_receive(self, mock_is_port, mock_proto_num):
192
        from obdlib.response import Response
193
194
        buffer = 'ATZ\r\x00\n>'
195
196
        def read_character(n):
197
            """
198
                Emulate UART buffer
199
            """
200
            r = buffer[read_character.counter]
201
            read_character.counter += n
202
            return r.encode()
203
204
        # if connection
205
        mock_is_port.return_value = True
206
        mock_proto_num.return_value = 5
207
        self.scan.uart_port = mock.Mock()
208
209
        read_character.counter = 0
210
        self.scan.uart_port.read.side_effect = read_character
211
        response = self.scan.receive()
212
        self.assertIsInstance(response, Response)
213
        self.assertEqual(self.scan.uart_port.read.call_count, len(buffer))
214
        self.assertEqual(self.scan.uart_port.read.call_args_list[0][0], (1,))
215
216
        # if have not connection
217
        mock_is_port.reset_mock()
218
        mock_proto_num.reset_mock()
219
        mock_is_port.return_value = False
220
        with self.assertRaises(Exception) as cm:
221
            self.scan.receive()
222
        self.assertEqual(cm.exception.__str__(), 'Cannot read when unconnected')
223
224
        # if connection but nothing were received
225
        mock_is_port.reset_mock()
226
        mock_proto_num.reset_mock()
227
        mock_is_port.return_value = True
228
        mock_proto_num.return_value = 5
229
        self.scan.uart_port = mock.Mock()
230
231
        self.scan.uart_port.read.return_value = b''
232
        response = self.scan.receive()
233
        self.assertIsInstance(response, Response)
234
        self.assertEqual(self.scan.uart_port.read.call_count, 11)
235
        self.assertEqual(self.scan.uart_port.read.call_args_list[0][0], (1,))
236
237
    @mock.patch('obdlib.scanner.OBDScanner.send')
238
    @mock.patch('obdlib.scanner.OBDScanner.is_port')
239
    def test_reset(self, mock_is_port, mock_send):
240
        mock_is_port.return_value = True
241
        self.scan.reset()
242
        self.assertEqual(mock_is_port.call_count, 1)
243
        self.assertEqual(mock_is_port.call_args_list[0][0], ())
244
        self.assertEqual(mock_send.call_count, 1)
245
        self.assertEqual(mock_send.call_args_list[0][0], ('ATZ', 1))
246
247
    @mock.patch('obdlib.scanner.OBDScanner.receive')
248
    @mock.patch('obdlib.scanner.OBDScanner._write')
249
    @mock.patch('obdlib.scanner.OBDScanner.is_port')
250
    @mock.patch('obdlib.scanner.time.sleep')
251
    def test_send(self, mock_sleep, mock_is_port, mock_write, mock_receive):
252
        from obdlib.response import Response
253
254
        mock_is_port.return_value = True
255
        mock_receive.return_value = Response()
256
        resp = self.scan.send('0100')
257
258
        self.assertEqual(mock_is_port.call_count, 1)
259
        self.assertEqual(mock_is_port.call_args_list[0][0], ())
260
261
        self.assertEqual(mock_sleep.call_count, 0)
262
263
        self.assertEqual(mock_write.call_count, 1)
264
        self.assertEqual(mock_write.call_args_list[0][0], ('0100',))
265
266
        self.assertEqual(mock_receive.call_count, 1)
267
        self.assertEqual(mock_receive.call_args_list[0][0], ())
268
269
        self.assertIsInstance(resp, Response)
270
271
        mock_is_port.reset_mock()
272
        mock_write.reset_mock()
273
        mock_receive.reset_mock()
274
        resp = self.scan.send('0100', 1)
275
276
        self.assertEqual(mock_is_port.call_count, 1)
277
        self.assertEqual(mock_is_port.call_args_list[0][0], ())
278
279
        self.assertEqual(mock_sleep.call_count, 1)
280
        self.assertEqual(mock_sleep.call_args_list[0][0], (1,))
281
282
        self.assertEqual(mock_write.call_count, 1)
283
        self.assertEqual(mock_write.call_args_list[0][0], ('0100',))
284
285
        self.assertEqual(mock_receive.call_count, 1)
286
        self.assertEqual(mock_receive.call_args_list[0][0], ())
287
288
        self.assertIsInstance(resp, Response)
289
290
    def test_vehicle_id_number(self):
291
        self.scan.sensor = mock.MagicMock()
292
293
        class p(object):
294
            @property
295
            def gen(self):
296
                for ecu, value in {'E8': '2312KJLKJHLKJ43L34323'}.items():
297
                    yield (ecu, value)
298
299
        def mode(a):
300
            def pid(n):
301
                pid.ecus = p().gen
302
                return pid
303
304
            return pid
305
306
        self.scan.sensor.__getitem__.side_effect = mode
307
        vin = self.scan.vehicle_id_number()
308
        self.assertEqual(vin, {'E8': '2312KJLKJHLKJ43L34323'})
309
310
    @mock.patch('sys.stdout')
311
    @mock.patch('obdlib.scanner.OBDScanner.send')
312
    def test_clear_trouble_codes(self, mock_send, mock_out):
313
        def send(data):
314
            class r_v():
315
                raw_value = value
316
317
            return r_v
318
319
        # Return OK
320
        value = ['OK']
321
        mock_send.side_effect = send
322
        resp = self.scan.clear_trouble_codes()
323
324
        self.assertEqual(mock_send.call_count, 1)
325
        self.assertEqual(mock_send.call_args_list[0][0], ('04',))
326
327
        self.assertIsNone(resp)
328
329
        # Do not return OK
330
        mock_send.reset_mock()
331
        value = ['?']
332
        mock_send.side_effect = send
333
        resp = self.scan.clear_trouble_codes()
334
335
        self.assertEqual(mock_send.call_count, 1)
336
        self.assertEqual(mock_send.call_args_list[0][0], ('04',))
337
338
        self.assertIsNone(resp)
339
340
    @mock.patch('obdlib.scanner.OBDScanner.send')
341
    def test_echo_off(self, mock_send):
342
        def send(data):
343
            class r_v():
344
                raw_value = value
345
346
            return r_v
347
348
        value = ['OK']
349
        mock_send.side_effect = send
350
        resp = self.scan.echo_off()
351
        self.assertEqual(mock_send.call_count, 1)
352
        self.assertEqual(mock_send.call_args_list[0][0], ('ATE0',))
353
        self.assertEqual(resp, ['OK'])
354
355
    def test__check_response(self):
356
        resp = self.scan._check_response(['OK'])
357
        self.assertTrue(resp)
358
359
        resp = self.scan._check_response(['?'])
360
        self.assertFalse(resp)
361
362
    def test__write(self):
363
        self.scan.uart_port = mock.Mock()
364
        self.scan._write('ATH1')
365
366
        self.assertEqual(self.scan.uart_port.flushOutput.call_count, 1)
367
        self.assertEqual(self.scan.uart_port.flushOutput.call_args_list[0][0], ())
368
369
        self.assertEqual(self.scan.uart_port.flushInput.call_count, 1)
370
        self.assertEqual(self.scan.uart_port.flushInput.call_args_list[0][0], ())
371
372
        self.assertEqual(self.scan.uart_port.write.call_count, 1)
373
        self.assertEqual(self.scan.uart_port.write.call_args_list[0][0], (b'ATH1\r\n',))
374
375
376
if __name__ == '__main__':
377
    suite = unittest.TestLoader().loadTestsFromTestCase(TestOBDScanner)
378
    unittest.TextTestRunner(verbosity=2).run(suite)
379
380