Failed Conditions
Pull Request — master (#1182)
by Lasse
01:44
created

coalib.tests.output.dbus.test_dbus()   A

Complexity

Conditions 3

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 9
rs 9.6667
1
import sys
2
import os
3
import unittest
4
import subprocess
5
import time
6
import dbus
7
import dbus.mainloop.glib
8
from gi.repository import GLib
9
10
sys.path.insert(0, ".")
11
from coalib.output.dbus.DbusServer import DbusServer
12
from coalib.misc.Constants import Constants
13
14
15
def make_test_server():
16
    # Make a dbus service in a new process. It cannot be in this process
17
    # as that gives SegmentationFaults because the same bus is being used.
18
    return subprocess.Popen([
19
        sys.executable,
20
        __file__,
21
        "server"])
22
23
24
def create_mainloop():
25
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
26
27
    session_bus = dbus.SessionBus()
28
    # The BusName needs to be saved to a variable, if it is not saved - the
29
    # Bus will be closed.
30
    dbus_name = dbus.service.BusName("org.coala_analyzer.v1.test", session_bus)
31
    dbus_server = DbusServer(session_bus, "/org/coala_analyzer/v1/test",
32
        on_disconnected=lambda: GLib.idle_add(lambda: sys.exit(0)))
33
34
    mainloop = GLib.MainLoop()
35
    mainloop.run()
36
37
38
class DbusTest(unittest.TestCase):
39
    def setUp(self):
40
        self.config_path = os.path.abspath(
41
            os.path.join(os.path.dirname(__file__),
42
            "dbus_test_files",
43
            ".coafile"))
44
        self.testcode_c_path = os.path.abspath(
45
            os.path.join(os.path.dirname(__file__),
46
            "dbus_test_files",
47
            "testcode.c"))
48
49
        self.subprocess = make_test_server()
50
        trials_left = 10
51
52
        while trials_left > 0:
53
            time.sleep(0.1)
54
            trials_left = trials_left - 1
55
            try:
56
                self.connect_to_test_server()
57
                continue
58
            except dbus.exceptions.DBusException as exception:
59
                if trials_left == 0:
60
                    raise exception
61
62
    def connect_to_test_server(self):
63
        self.bus = dbus.SessionBus()
64
        self.remote_object = self.bus.get_object("org.coala_analyzer.v1.test",
65
                                                 "/org/coala_analyzer/v1/test")
66
67
    def test_dbus(self):
68
        self.document_object_path = self.remote_object.CreateDocument(
69
            self.testcode_c_path,
70
            dbus_interface="org.coala_analyzer.v1")
71
72
        self.assertRegex(str(self.document_object_path),
73
                         r"^/org/coala_analyzer/v1/test/\d+/documents/\d+$")
74
75
        self.document_object = self.bus.get_object(
76
            "org.coala_analyzer.v1.test",
77
            self.document_object_path)
78
79
        config_file = self.document_object.SetConfigFile(
80
            "dummy_config",
81
            dbus_interface="org.coala_analyzer.v1")
82
        self.assertEqual(config_file, "dummy_config")
83
84
        config_file = self.document_object.GetConfigFile(
85
            dbus_interface="org.coala_analyzer.v1")
86
        self.assertEqual(config_file, "dummy_config")
87
88
        config_file = self.document_object.FindConfigFile(
89
            dbus_interface="org.coala_analyzer.v1")
90
        self.assertEqual(config_file, self.config_path)
91
92
        analysis = self.document_object.Analyze(
93
            dbus_interface="org.coala_analyzer.v1")
94
        self.maxDiff = None
95
        self.assertEqual(analysis,
96
                         (1,
97
                          [],
98
                          [('default',
99
                            True,
100
                            [{'debug_msg': '',
101
                              'file': '',
102
                              'id': analysis[2][0][2][0]['id'],
103
                              'line_nr': "",
104
                              'message': 'test msg',
105
                              'origin': 'LocalTestBear',
106
                              'severity': 'NORMAL'},
107
                             {'debug_msg': '',
108
                              'file': self.testcode_c_path,
109
                              'id': analysis[2][0][2][1]['id'],
110
                              'line_nr': "",
111
                              'message': 'test msg',
112
                              'origin': 'GlobalTestBear',
113
                              'severity': 'NORMAL'}])]))
114
115
        config_file = self.document_object.SetConfigFile(
116
            self.config_path + "2",
117
            dbus_interface="org.coala_analyzer.v1")
118
        analysis = self.document_object.Analyze(
119
            dbus_interface="org.coala_analyzer.v1")
120
        self.assertEqual(analysis[0], 255)
121
        self.assertEqual(analysis[1][1]["log_level"], "ERROR")
122
        self.assertEqual(analysis[1][1]["message"], Constants.CRASH_MESSAGE)
123
124
        # Skip file if file pattern doesn't match
125
        # Also test if 2 documents can be opened simultaneously
126
        self.document_object_path = self.remote_object.CreateDocument(
127
            "test.unknown_ext",
128
            dbus_interface="org.coala_analyzer.v1")
129
        self.document_object = self.bus.get_object(
130
            "org.coala_analyzer.v1.test",
131
            self.document_object_path)
132
        config_file = self.document_object.SetConfigFile(
133
            self.config_path,
134
            dbus_interface="org.coala_analyzer.v1")
135
        analysis = self.document_object.Analyze(
136
            dbus_interface="org.coala_analyzer.v1")
137
        self.assertEqual(analysis, (0, [], []))
138
139
        self.remote_object.DisposeDocument(
140
            self.testcode_c_path,
141
            dbus_interface="org.coala_analyzer.v1")
142
143
        self.remote_object.DisposeDocument(
144
            "test.unknown_ext",
145
            dbus_interface="org.coala_analyzer.v1")
146
147
    def tearDown(self):
148
        if self.subprocess:
149
            self.subprocess.kill()
150
151
152
def test_dbus():
153
    arg = ""
154
    if len(sys.argv) > 1:
155
        arg = sys.argv[1]
156
157
    if arg == "server":
158
        create_mainloop()
159
    else:
160
        unittest.main(verbosity=2)
161