Completed
Pull Request — master (#1538)
by Abdeali
01:47
created

coalib.tests.output.dbus.make_test_server()   B

Complexity

Conditions 1

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 26
rs 8.8571
1
import os
2
import subprocess
3
import sys
4
import time
5
import unittest
6
from unittest.case import SkipTest
7
8
from coalib.misc import Constants
9
10
try:
11
    import dbus
12
    # Needed to determine if test needs skipping
13
    from gi.repository import GLib
0 ignored issues
show
Unused Code introduced by
Unused GLib imported from gi.repository
Loading history...
14
except ImportError as err:
15
    raise SkipTest('python-dbus or python-gi is not installed')
16
17
18
def make_test_server():
19
    # Make a dbus service in a new process. It cannot be in this process
20
    # as that gives SegmentationFaults because the same bus is being used.
21
22
    # For some reason this also fails on some systems if moved to another file
23
    return subprocess.Popen([
24
        sys.executable,
25
        '-c',
26
        """
27
import sys
28
import dbus
29
import dbus.mainloop.glib
30
from gi.repository import GLib
31
from coalib.output.dbus.DbusServer import DbusServer
32
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
33
session_bus = dbus.SessionBus()
34
dbus_name = dbus.service.BusName("org.coala_analyzer.v1.test", session_bus)
35
dbus_server = DbusServer(session_bus, "/org/coala_analyzer/v1/test",
36
                         on_disconnected=lambda: GLib.idle_add(sys.exit))
37
mainloop = GLib.MainLoop()
38
mainloop.run()
39
"""],
40
        stdout=subprocess.PIPE,
41
        stderr=subprocess.PIPE,
42
        stdin=subprocess.PIPE,
43
        universal_newlines=True)
44
45
46
class DbusTest(unittest.TestCase):
47
48
    def setUp(self):
49
        self.config_path = os.path.abspath(
50
            os.path.join(os.path.dirname(__file__),
51
                         "dbus_test_files",
52
                         ".coafile"))
53
        self.testcode_c_path = os.path.abspath(
54
            os.path.join(os.path.dirname(__file__),
55
                         "dbus_test_files",
56
                         "testcode.c"))
57
58
        self.subprocess = make_test_server()
59
        trials_left = 10
60
61
        while trials_left > 0:
62
            time.sleep(0.1)
63
            trials_left = trials_left - 1
64
            try:
65
                self.connect_to_test_server()
66
                continue
67
            except dbus.exceptions.DBusException as exception:
68
                stdout, stderr = self.subprocess.communicate()
69
                print("STDOUT", "-" * 70)
70
                print(stdout)
71
                print("STDERR", "-" * 70)
72
                print(stderr)
73
                if trials_left == 0:
74
                    raise exception
75
76
    def connect_to_test_server(self):
77
        self.bus = dbus.SessionBus()
78
        self.remote_object = self.bus.get_object("org.coala_analyzer.v1.test",
79
                                                 "/org/coala_analyzer/v1/test")
80
81
    def test_dbus(self):
82
        self.document_object_path = self.remote_object.CreateDocument(
83
            self.testcode_c_path,
84
            dbus_interface="org.coala_analyzer.v1")
85
86
        self.assertRegex(str(self.document_object_path),
87
                         r"^/org/coala_analyzer/v1/test/\d+/documents/\d+$")
88
89
        self.document_object = self.bus.get_object(
90
            "org.coala_analyzer.v1.test",
91
            self.document_object_path)
92
93
        config_file = self.document_object.SetConfigFile(
94
            "dummy_config",
95
            dbus_interface="org.coala_analyzer.v1")
96
        self.assertEqual(config_file, "dummy_config")
97
98
        config_file = self.document_object.GetConfigFile(
99
            dbus_interface="org.coala_analyzer.v1")
100
        self.assertEqual(config_file, "dummy_config")
101
102
        config_file = self.document_object.FindConfigFile(
103
            dbus_interface="org.coala_analyzer.v1")
104
        self.assertEqual(config_file, self.config_path)
105
106
        analysis = self.document_object.Analyze(
107
            dbus_interface="org.coala_analyzer.v1")
108
        self.maxDiff = None
109
        self.assertEqual(analysis,
110
                         (1,
111
                          [],
112
                          [('default',
113
                            True,
114
                            [{'debug_msg': '',
115
                              'file': '',
116
                              'id': analysis[2][0][2][0]['id'],
117
                              'line_nr': "",
118
                              'message': 'test msg',
119
                              'origin': 'LocalTestBear',
120
                              'severity': 'NORMAL'},
121
                             {'debug_msg': '',
122
                              'file': self.testcode_c_path,
123
                              'id': analysis[2][0][2][1]['id'],
124
                              'line_nr': "",
125
                              'message': 'test msg',
126
                              'origin': 'GlobalTestBear',
127
                              'severity': 'NORMAL'}])]))
128
129
        config_file = self.document_object.SetConfigFile(
130
            self.config_path + "2",
131
            dbus_interface="org.coala_analyzer.v1")
132
        analysis = self.document_object.Analyze(
133
            dbus_interface="org.coala_analyzer.v1")
134
        self.assertEqual(analysis[0], 255)
135
        self.assertEqual(analysis[1][1]["log_level"], "ERROR")
136
        self.assertEqual(analysis[1][1]["message"], Constants.CRASH_MESSAGE)
137
138
        # Skip file if file pattern doesn't match
139
        # Also test if 2 documents can be opened simultaneously
140
        self.document_object_path = self.remote_object.CreateDocument(
141
            "test.unknown_ext",
142
            dbus_interface="org.coala_analyzer.v1")
143
        self.document_object = self.bus.get_object(
144
            "org.coala_analyzer.v1.test",
145
            self.document_object_path)
146
        config_file = self.document_object.SetConfigFile(
147
            self.config_path,
148
            dbus_interface="org.coala_analyzer.v1")
149
        analysis = self.document_object.Analyze(
150
            dbus_interface="org.coala_analyzer.v1")
151
        self.assertEqual(analysis, (0, [], []))
152
153
        self.remote_object.DisposeDocument(
154
            self.testcode_c_path,
155
            dbus_interface="org.coala_analyzer.v1")
156
157
        self.remote_object.DisposeDocument(
158
            "test.unknown_ext",
159
            dbus_interface="org.coala_analyzer.v1")
160
161
    def tearDown(self):
162
        if self.subprocess:
163
            self.subprocess.kill()
164