Failed Conditions
Pull Request — master (#1295)
by Abdeali
01:27
created

coalib.tests.output.dbus.make_test_server()   A

Complexity

Conditions 1

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 22
rs 9.2
1
import sys
2
import os
3
import unittest
4
from unittest.case import skipIf
5
import subprocess
6
import time
7
8
sys.path.insert(0, ".")
9
from coalib.misc import Constants
10
11
try:
12
    import dbus
13
    import dbus.mainloop.glib
14
    from gi.repository import GLib
0 ignored issues
show
Unused Code introduced by
Unused GLib imported from gi.repository
Loading history...
15
16
    from coalib.output.dbus.DbusServer import DbusServer
0 ignored issues
show
Unused Code introduced by
Unused DbusServer imported from coalib.output.dbus.DbusServer
Loading history...
17
    skip, message = False, ''
18
except ImportError as err:
19
    skip, message = True, str(err)
20
21
22
def make_test_server():
23
    # Make a dbus service in a new process. It cannot be in this process
24
    # as that gives SegmentationFaults because the same bus is being used.
25
26
    # For some reason this also fails on some systems if moved to another file
27
    return subprocess.Popen([
28
        sys.executable,
29
        '-c',
30
        """
31
import sys
32
import dbus
33
import dbus.mainloop.glib
34
from gi.repository import GLib
35
from coalib.output.dbus.DbusServer import DbusServer
36
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
37
session_bus = dbus.SessionBus()
38
dbus_name = dbus.service.BusName("org.coala_analyzer.v1.test", session_bus)
39
dbus_server = DbusServer(session_bus, "/org/coala_analyzer/v1/test",
40
                         on_disconnected=lambda: GLib.idle_add(sys.exit))
41
mainloop = GLib.MainLoop()
42
mainloop.run()
43
"""])
44
45
46
@skipIf(skip, message)
47
class DbusTest(unittest.TestCase):
48
49
    def setUp(self):
50
        self.config_path = os.path.abspath(
51
            os.path.join(os.path.dirname(__file__),
52
                         "dbus_test_files",
53
                         ".coafile"))
54
        self.testcode_c_path = os.path.abspath(
55
            os.path.join(os.path.dirname(__file__),
56
                         "dbus_test_files",
57
                         "testcode.c"))
58
59
        self.subprocess = make_test_server()
60
        trials_left = 10
61
62
        while trials_left > 0:
63
            time.sleep(0.1)
64
            trials_left = trials_left - 1
65
            try:
66
                self.connect_to_test_server()
67
                continue
68
            except dbus.exceptions.DBusException as exception:
69
                if trials_left == 0:
70
                    raise exception
71
72
    def connect_to_test_server(self):
73
        self.bus = dbus.SessionBus()
74
        self.remote_object = self.bus.get_object("org.coala_analyzer.v1.test",
75
                                                 "/org/coala_analyzer/v1/test")
76
77
    def test_dbus(self):
78
        self.document_object_path = self.remote_object.CreateDocument(
79
            self.testcode_c_path,
80
            dbus_interface="org.coala_analyzer.v1")
81
82
        self.assertRegex(str(self.document_object_path),
83
                         r"^/org/coala_analyzer/v1/test/\d+/documents/\d+$")
84
85
        self.document_object = self.bus.get_object(
86
            "org.coala_analyzer.v1.test",
87
            self.document_object_path)
88
89
        config_file = self.document_object.SetConfigFile(
90
            "dummy_config",
91
            dbus_interface="org.coala_analyzer.v1")
92
        self.assertEqual(config_file, "dummy_config")
93
94
        config_file = self.document_object.GetConfigFile(
95
            dbus_interface="org.coala_analyzer.v1")
96
        self.assertEqual(config_file, "dummy_config")
97
98
        config_file = self.document_object.FindConfigFile(
99
            dbus_interface="org.coala_analyzer.v1")
100
        self.assertEqual(config_file, self.config_path)
101
102
        analysis = self.document_object.Analyze(
103
            dbus_interface="org.coala_analyzer.v1")
104
        self.maxDiff = None
105
        self.assertEqual(analysis,
106
                         (1,
107
                          [],
108
                          [('default',
109
                            True,
110
                            [{'debug_msg': '',
111
                              'file': '',
112
                              'id': analysis[2][0][2][0]['id'],
113
                              'line_nr': "",
114
                              'message': 'test msg',
115
                              'origin': 'LocalTestBear',
116
                              'severity': 'NORMAL'},
117
                             {'debug_msg': '',
118
                              'file': self.testcode_c_path,
119
                              'id': analysis[2][0][2][1]['id'],
120
                              'line_nr': "",
121
                              'message': 'test msg',
122
                              'origin': 'GlobalTestBear',
123
                              'severity': 'NORMAL'}])]))
124
125
        config_file = self.document_object.SetConfigFile(
126
            self.config_path + "2",
127
            dbus_interface="org.coala_analyzer.v1")
128
        analysis = self.document_object.Analyze(
129
            dbus_interface="org.coala_analyzer.v1")
130
        self.assertEqual(analysis[0], 255)
131
        self.assertEqual(analysis[1][1]["log_level"], "ERROR")
132
        self.assertEqual(analysis[1][1]["message"], Constants.CRASH_MESSAGE)
133
134
        # Skip file if file pattern doesn't match
135
        # Also test if 2 documents can be opened simultaneously
136
        self.document_object_path = self.remote_object.CreateDocument(
137
            "test.unknown_ext",
138
            dbus_interface="org.coala_analyzer.v1")
139
        self.document_object = self.bus.get_object(
140
            "org.coala_analyzer.v1.test",
141
            self.document_object_path)
142
        config_file = self.document_object.SetConfigFile(
143
            self.config_path,
144
            dbus_interface="org.coala_analyzer.v1")
145
        analysis = self.document_object.Analyze(
146
            dbus_interface="org.coala_analyzer.v1")
147
        self.assertEqual(analysis, (0, [], []))
148
149
        self.remote_object.DisposeDocument(
150
            self.testcode_c_path,
151
            dbus_interface="org.coala_analyzer.v1")
152
153
        self.remote_object.DisposeDocument(
154
            "test.unknown_ext",
155
            dbus_interface="org.coala_analyzer.v1")
156
157
    def tearDown(self):
158
        if self.subprocess:
159
            self.subprocess.kill()
160
161
162
if __name__ == "__main__" and not skip:
163
    unittest.main(verbosity=2)
164