Failed Conditions
Pull Request — master (#1295)
by Lasse
01:31
created

coalib.tests.output.dbus.make_test_server()   A

Complexity

Conditions 1

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 22
rs 9.2
1
import sys
2
import os
3
import unittest
4
from unittest.case import skipIf
5
import subprocess
6
import time
7
8
sys.path.insert(0, ".")
9
from coalib.misc import Constants
10
11
try:
12
    import dbus
13
    # Needed to determine if test needs skipping
14
    from gi.repository import GLib
0 ignored issues
show
Unused Code introduced by
Unused GLib imported from gi.repository
Loading history...
15
16
    skip, message = False, ''
17
except ImportError as err:
18
    skip, message = True, str(err)
19
20
21
def make_test_server():
22
    # Make a dbus service in a new process. It cannot be in this process
23
    # as that gives SegmentationFaults because the same bus is being used.
24
25
    # For some reason this also fails on some systems if moved to another file
26
    return subprocess.Popen([
27
        sys.executable,
28
        '-c',
29
        """
30
import sys
31
import dbus
32
import dbus.mainloop.glib
33
from gi.repository import GLib
34
from coalib.output.dbus.DbusServer import DbusServer
35
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
36
session_bus = dbus.SessionBus()
37
dbus_name = dbus.service.BusName("org.coala_analyzer.v1.test", session_bus)
38
dbus_server = DbusServer(session_bus, "/org/coala_analyzer/v1/test",
39
                         on_disconnected=lambda: GLib.idle_add(sys.exit))
40
mainloop = GLib.MainLoop()
41
mainloop.run()
42
"""])
43
44
45
@skipIf(skip, message)
46
class DbusTest(unittest.TestCase):
47
48
    def setUp(self):
49
        self.config_path = os.path.abspath(
50
            os.path.join(os.path.dirname(__file__),
51
                         "dbus_test_files",
52
                         ".coafile"))
53
        self.testcode_c_path = os.path.abspath(
54
            os.path.join(os.path.dirname(__file__),
55
                         "dbus_test_files",
56
                         "testcode.c"))
57
58
        self.subprocess = make_test_server()
59
        trials_left = 10
60
61
        while trials_left > 0:
62
            time.sleep(0.1)
63
            trials_left = trials_left - 1
64
            try:
65
                self.connect_to_test_server()
66
                continue
67
            except dbus.exceptions.DBusException as exception:
68
                if trials_left == 0:
69
                    raise exception
70
71
    def connect_to_test_server(self):
72
        self.bus = dbus.SessionBus()
73
        self.remote_object = self.bus.get_object("org.coala_analyzer.v1.test",
74
                                                 "/org/coala_analyzer/v1/test")
75
76
    def test_dbus(self):
77
        self.document_object_path = self.remote_object.CreateDocument(
78
            self.testcode_c_path,
79
            dbus_interface="org.coala_analyzer.v1")
80
81
        self.assertRegex(str(self.document_object_path),
82
                         r"^/org/coala_analyzer/v1/test/\d+/documents/\d+$")
83
84
        self.document_object = self.bus.get_object(
85
            "org.coala_analyzer.v1.test",
86
            self.document_object_path)
87
88
        config_file = self.document_object.SetConfigFile(
89
            "dummy_config",
90
            dbus_interface="org.coala_analyzer.v1")
91
        self.assertEqual(config_file, "dummy_config")
92
93
        config_file = self.document_object.GetConfigFile(
94
            dbus_interface="org.coala_analyzer.v1")
95
        self.assertEqual(config_file, "dummy_config")
96
97
        config_file = self.document_object.FindConfigFile(
98
            dbus_interface="org.coala_analyzer.v1")
99
        self.assertEqual(config_file, self.config_path)
100
101
        analysis = self.document_object.Analyze(
102
            dbus_interface="org.coala_analyzer.v1")
103
        self.maxDiff = None
104
        self.assertEqual(analysis,
105
                         (1,
106
                          [],
107
                          [('default',
108
                            True,
109
                            [{'debug_msg': '',
110
                              'file': '',
111
                              'id': analysis[2][0][2][0]['id'],
112
                              'line_nr': "",
113
                              'message': 'test msg',
114
                              'origin': 'LocalTestBear',
115
                              'severity': 'NORMAL'},
116
                             {'debug_msg': '',
117
                              'file': self.testcode_c_path,
118
                              'id': analysis[2][0][2][1]['id'],
119
                              'line_nr': "",
120
                              'message': 'test msg',
121
                              'origin': 'GlobalTestBear',
122
                              'severity': 'NORMAL'}])]))
123
124
        config_file = self.document_object.SetConfigFile(
125
            self.config_path + "2",
126
            dbus_interface="org.coala_analyzer.v1")
127
        analysis = self.document_object.Analyze(
128
            dbus_interface="org.coala_analyzer.v1")
129
        self.assertEqual(analysis[0], 255)
130
        self.assertEqual(analysis[1][1]["log_level"], "ERROR")
131
        self.assertEqual(analysis[1][1]["message"], Constants.CRASH_MESSAGE)
132
133
        # Skip file if file pattern doesn't match
134
        # Also test if 2 documents can be opened simultaneously
135
        self.document_object_path = self.remote_object.CreateDocument(
136
            "test.unknown_ext",
137
            dbus_interface="org.coala_analyzer.v1")
138
        self.document_object = self.bus.get_object(
139
            "org.coala_analyzer.v1.test",
140
            self.document_object_path)
141
        config_file = self.document_object.SetConfigFile(
142
            self.config_path,
143
            dbus_interface="org.coala_analyzer.v1")
144
        analysis = self.document_object.Analyze(
145
            dbus_interface="org.coala_analyzer.v1")
146
        self.assertEqual(analysis, (0, [], []))
147
148
        self.remote_object.DisposeDocument(
149
            self.testcode_c_path,
150
            dbus_interface="org.coala_analyzer.v1")
151
152
        self.remote_object.DisposeDocument(
153
            "test.unknown_ext",
154
            dbus_interface="org.coala_analyzer.v1")
155
156
    def tearDown(self):
157
        if self.subprocess:
158
            self.subprocess.kill()
159
160
161
if __name__ == "__main__" and not skip:
162
    unittest.main(verbosity=2)
163