Completed
Pull Request — master (#1538)
by Abdeali
01:49
created

coalib.tests.output.dbus.make_test_server()   B

Complexity

Conditions 1

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 27
rs 8.8571
1
import os
2
import subprocess
3
import sys
4
import time
5
import unittest
6
from unittest.case import SkipTest
7
8
from coalib.misc import Constants
9
10
try:
11
    import dbus
12
    # Needed to determine if test needs skipping
13
    from gi.repository import GLib
0 ignored issues
show
Unused Code introduced by
Unused GLib imported from gi.repository
Loading history...
14
except ImportError as err:
15
    raise SkipTest('python-dbus or python-gi is not installed')
16
17
18
def make_test_server():
19
    # Make a dbus service in a new process. It cannot be in this process
20
    # as that gives SegmentationFaults because the same bus is being used.
21
22
    # For some reason this also fails on some systems if moved to another file
23
    return subprocess.Popen([
24
        sys.executable,
25
        '-c',
26
        """
27
import sys
28
import dbus
29
import dbus.mainloop.glib
30
from gi.repository import GLib
31
from coalib.output.dbus.DbusServer import DbusServer
32
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
33
session_bus = dbus.SessionBus()
34
dbus_name = dbus.service.BusName("org.coala_analyzer.v1.test", session_bus)
35
dbus_server = DbusServer(session_bus, "/org/coala_analyzer/v1/test",
36
                         on_disconnected=lambda: GLib.idle_add(sys.exit))
37
mainloop = GLib.MainLoop()
38
print("Running server")
39
mainloop.run()
40
"""],
41
        stdout=subprocess.PIPE,
42
        stderr=subprocess.PIPE,
43
        stdin=subprocess.PIPE,
44
        universal_newlines=True)
45
46
47
class DbusTest(unittest.TestCase):
48
49
    def setUp(self):
50
        self.config_path = os.path.abspath(
51
            os.path.join(os.path.dirname(__file__),
52
                         "dbus_test_files",
53
                         ".coafile"))
54
        self.testcode_c_path = os.path.abspath(
55
            os.path.join(os.path.dirname(__file__),
56
                         "dbus_test_files",
57
                         "testcode.c"))
58
59
        self.subprocess = make_test_server()
60
        trials_left = 10
61
62
        while trials_left > 0:
63
            time.sleep(0.1)
64
            trials_left = trials_left - 1
65
            try:
66
                self.connect_to_test_server()
67
                continue
68
            except dbus.exceptions.DBusException as exception:
69
                stdout, stderr = self.subprocess.communicate()
70
                print("STDOUT", "-" * 70)
71
                print(stdout)
72
                print("STDERR", "-" * 70)
73
                print(stderr)
74
                if trials_left == 0:
75
                    raise exception
76
77
    def connect_to_test_server(self):
78
        self.bus = dbus.SessionBus()
79
        self.remote_object = self.bus.get_object("org.coala_analyzer.v1.test",
80
                                                 "/org/coala_analyzer/v1/test")
81
82
    def test_dbus(self):
83
        self.document_object_path = self.remote_object.CreateDocument(
84
            self.testcode_c_path,
85
            dbus_interface="org.coala_analyzer.v1")
86
87
        self.assertRegex(str(self.document_object_path),
88
                         r"^/org/coala_analyzer/v1/test/\d+/documents/\d+$")
89
90
        self.document_object = self.bus.get_object(
91
            "org.coala_analyzer.v1.test",
92
            self.document_object_path)
93
94
        config_file = self.document_object.SetConfigFile(
95
            "dummy_config",
96
            dbus_interface="org.coala_analyzer.v1")
97
        self.assertEqual(config_file, "dummy_config")
98
99
        config_file = self.document_object.GetConfigFile(
100
            dbus_interface="org.coala_analyzer.v1")
101
        self.assertEqual(config_file, "dummy_config")
102
103
        config_file = self.document_object.FindConfigFile(
104
            dbus_interface="org.coala_analyzer.v1")
105
        self.assertEqual(config_file, self.config_path)
106
107
        analysis = self.document_object.Analyze(
108
            dbus_interface="org.coala_analyzer.v1")
109
        self.maxDiff = None
110
        self.assertEqual(analysis,
111
                         (1,
112
                          [],
113
                          [('default',
114
                            True,
115
                            [{'debug_msg': '',
116
                              'file': '',
117
                              'id': analysis[2][0][2][0]['id'],
118
                              'line_nr': "",
119
                              'message': 'test msg',
120
                              'origin': 'LocalTestBear',
121
                              'severity': 'NORMAL'},
122
                             {'debug_msg': '',
123
                              'file': self.testcode_c_path,
124
                              'id': analysis[2][0][2][1]['id'],
125
                              'line_nr': "",
126
                              'message': 'test msg',
127
                              'origin': 'GlobalTestBear',
128
                              'severity': 'NORMAL'}])]))
129
130
        config_file = self.document_object.SetConfigFile(
131
            self.config_path + "2",
132
            dbus_interface="org.coala_analyzer.v1")
133
        analysis = self.document_object.Analyze(
134
            dbus_interface="org.coala_analyzer.v1")
135
        self.assertEqual(analysis[0], 255)
136
        self.assertEqual(analysis[1][1]["log_level"], "ERROR")
137
        self.assertEqual(analysis[1][1]["message"], Constants.CRASH_MESSAGE)
138
139
        # Skip file if file pattern doesn't match
140
        # Also test if 2 documents can be opened simultaneously
141
        self.document_object_path = self.remote_object.CreateDocument(
142
            "test.unknown_ext",
143
            dbus_interface="org.coala_analyzer.v1")
144
        self.document_object = self.bus.get_object(
145
            "org.coala_analyzer.v1.test",
146
            self.document_object_path)
147
        config_file = self.document_object.SetConfigFile(
148
            self.config_path,
149
            dbus_interface="org.coala_analyzer.v1")
150
        analysis = self.document_object.Analyze(
151
            dbus_interface="org.coala_analyzer.v1")
152
        self.assertEqual(analysis, (0, [], []))
153
154
        self.remote_object.DisposeDocument(
155
            self.testcode_c_path,
156
            dbus_interface="org.coala_analyzer.v1")
157
158
        self.remote_object.DisposeDocument(
159
            "test.unknown_ext",
160
            dbus_interface="org.coala_analyzer.v1")
161
162
    def tearDown(self):
163
        if self.subprocess:
164
            self.subprocess.kill()
165