Completed
Pull Request — master (#161)
by Denis
07:42 queued 05:07
created

check_eventgenerator_BaseEvent()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2
Metric Value
cc 1
dl 0
loc 6
ccs 0
cts 0
cp 0
crap 2
rs 9.4285
1 1
import unittest
2 1
from tests_common import CommonTests, add_server_methods, MySubHandler
3 1
import time
4 1
from datetime import timedelta
5
6 1
import opcua
7 1
from opcua import Server
8 1
from opcua import Client
9
from opcua import ua
10
11 1
12 1
port_num = 485140
13
port_discovery = 48550
14
15 1
16
class TestServer(unittest.TestCase, CommonTests):
17
18
    '''
19
    Run common tests on server side
20
    Tests that can only be run on server side must be defined here
21 1
    '''
22
    @classmethod
23 1
    def setUpClass(self):
24 1
        self.srv = Server()
25 1
        self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num)
26 1
        add_server_methods(self.srv)
27 1
        self.srv.start()
28 1
        self.opc = self.srv
29 1
        self.discovery = Server()
30 1
        self.discovery.set_application_uri("urn:freeopcua:python:discovery")
31 1
        self.discovery.set_endpoint('opc.tcp://localhost:%d' % port_discovery)
32
        self.discovery.start()
33 1
34
    @classmethod
35 1
    def tearDownClass(self):
36 1
        self.srv.stop()
37
        self.discovery.stop()
38 1
39 1
    def test_discovery(self):
40 1
        client = Client(self.discovery.endpoint.geturl())
41 1
        client.connect()
42 1
        try:
43 1
            servers = client.find_servers()
44 1
            new_app_uri = "urn:freeopcua:python:server:test_discovery"
45 1
            self.srv.application_uri = new_app_uri
46 1
            self.srv.register_to_discovery(self.discovery.endpoint.geturl(), 0)
47 1
            time.sleep(0.1) # let server register registration
48 1
            new_servers = client.find_servers()
49 1
            self.assertEqual(len(new_servers) - len(servers) , 1)
50 1
            self.assertFalse(new_app_uri in [s.ApplicationUri for s in servers])
51
            self.assertTrue(new_app_uri in [s.ApplicationUri for s in new_servers])
52 1
        finally:
53
            client.disconnect()
54 1
55 1
    def test_find_servers2(self):
56 1
        client = Client(self.discovery.endpoint.geturl())
57 1
        client.connect()
58 1
        try:
59 1
            servers = client.find_servers()
60 1
            new_app_uri1 = "urn:freeopcua:python:server:test_discovery1"
61 1
            self.srv.application_uri = new_app_uri1
62 1
            self.srv.register_to_discovery(self.discovery.endpoint.geturl(), period=0)
63 1
            new_app_uri2 = "urn:freeopcua:python:test_discovery2"
64 1
            self.srv.application_uri = new_app_uri2
65 1
            self.srv.register_to_discovery(self.discovery.endpoint.geturl(), period=0)
66 1
            time.sleep(0.1) # let server register registration
67 1
            new_servers = client.find_servers()
68 1
            self.assertEqual(len(new_servers) - len(servers) , 2)
69 1
            self.assertFalse(new_app_uri1 in [s.ApplicationUri for s in servers])
70 1
            self.assertFalse(new_app_uri2 in [s.ApplicationUri for s in servers])
71 1
            self.assertTrue(new_app_uri1 in [s.ApplicationUri for s in new_servers])
72
            self.assertTrue(new_app_uri2 in [s.ApplicationUri for s in new_servers])
73 1
            # now do a query with filer
74 1
            new_servers = client.find_servers(["urn:freeopcua:python:server"])
75 1
            self.assertEqual(len(new_servers) - len(servers) , 0)
76 1
            self.assertTrue(new_app_uri1 in [s.ApplicationUri for s in new_servers])
77
            self.assertFalse(new_app_uri2 in [s.ApplicationUri for s in new_servers])
78 1
            # now do a query with filer
79 1
            new_servers = client.find_servers(["urn:freeopcua:python"])
80 1
            self.assertEqual(len(new_servers) - len(servers) , 2)
81 1
            self.assertTrue(new_app_uri1 in [s.ApplicationUri for s in new_servers])
82
            self.assertTrue(new_app_uri2 in [s.ApplicationUri for s in new_servers])
83 1
        finally:
84
            client.disconnect()
85
86
87
    """
88
    # not sure if this test is necessary, and there is a lot repetition with previous test
89
    def test_discovery_server_side(self):
90
        servers = self.discovery.find_servers()
91
        self.assertEqual(len(servers), 1)
92
        self.srv.register_to_discovery(self.discovery.endpoint.geturl(), 1)
93
        time.sleep(1) # let server register registration
94
        servers = self.discovery.find_servers()
95
        print("SERVERS 2", servers)
96
        self.assertEqual(len(servers), 2)
97
    """
98
    #def test_register_server2(self):
99
        #servers = self.opc.register_server()
100 1
101 1
    def test_register_namespace(self):
102 1
        uri = 'http://mycustom.Namespace.com'
103 1
        idx1 = self.opc.register_namespace(uri)
104 1
        idx2 = self.opc.get_namespace_index(uri)
105
        self.assertEqual(idx1, idx2)
106 1
107 1
    def test_register_use_namespace(self):
108 1
        uri = 'http://my_very_custom.Namespace.com'
109 1
        idx = self.opc.register_namespace(uri)
110 1
        root = self.opc.get_root_node()
111 1
        myvar = root.add_variable(idx, 'var_in_custom_namespace', [5])
112 1
        myid = myvar.nodeid
113
        self.assertEqual(idx, myid.NamespaceIndex)
114 1
115 1
    def test_server_method(self):
116 1
        def func(parent, variant):
117 1
            variant.Value *= 2
118 1
            return [variant]
119 1
        o = self.opc.get_objects_node()
120 1
        v = o.add_method(3, 'Method1', func, [ua.VariantType.Int64], [ua.VariantType.Int64])
121 1
        result = o.call_method(v, ua.Variant(2.1))
122
        self.assertEqual(result, 4.2)
123 1
124 1
    def test_xml_import(self):
125 1
        self.srv.import_xml("tests/custom_nodes.xml")
126 1
        o = self.opc.get_objects_node()
127 1
        v = o.get_child(["MyXMLFolder", "MyXMLObject", "MyXMLVariable"])
128 1
        val = v.get_value()
129
        self.assertEqual(val, "StringValue")
130 1
131 1
    def test_historize(self):
132 1
        o = self.opc.get_objects_node()
133 1
        var = o.add_variable(3, "test_hist", 1.0)
134 1
        self.srv.iserver.enable_history(var, timedelta(days=1))
135 1
        time.sleep(1)
136 1
        var.set_value(2.0)
137 1
        var.set_value(3.0)
138
        self.srv.iserver.disable_history(var)
139
140
    # This should work for following BaseEvent tests to work (maybe to write it a bit differentlly since they are not independent)
141
    def test_get_event_from_node_BaseEvent(self):
142
        ev = opcua.common.event.get_event_from_node(opcua.Node(self.opc.iserver.isession, ua.NodeId(ua.ObjectIds.BaseEventType)))
143
        check_base_event(self, ev)
144
145
    def test_get_event_from_node_CustomEvent(self):
146
        ev = opcua.common.event.get_event_from_node(opcua.Node(self.opc.iserver.isession, ua.NodeId(ua.ObjectIds.AuditEventType)))
147
        check_base_event(self, ev)
148
149
    #def test_get_event_from_node_InheritanceEvent(self):
150
        #ev = opcua.common.event.get_event_from_node(opcua.Node(self.opc.iserver.isession, ua.NodeId(ua.ObjectIds.AuditEventType)))
151
        #self.assertIsNot(ev, None)  # we did not receive event
152
        #self.assertIsInstance(ev, ua.BaseEvent)
153
        #self.assertIsInstance(ev, ua.AuditEventType)
154
        #self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.AuditEventType))
155
        #self.assertEqual(ev.SourceNode, ua.NodeId(ua.ObjectIds.Server))
156
        #self.assertEqual(ev.Severity, ua.Variant(1, ua.VariantType.UInt16))
157
        #self.assertEqual(ev._freeze, True)
158
159
    def test_eventgenerator_default(self):
160
        evgen = self.opc.get_event_generator()
161
        check_eventgenerator_BaseEvent(self, evgen)
162
163
    def test_eventgenerator_BaseEvent_object(self):
164
        evgen = self.opc.get_event_generator(ua.BaseEvent())
165
        check_eventgenerator_BaseEvent(self, evgen)
166
167
    def test_eventgenerator_BaseEvent_Node(self):
168
        evgen = self.opc.get_event_generator(opcua.Node(self.opc.iserver.isession, ua.NodeId(ua.ObjectIds.BaseEventType)))
169
        check_eventgenerator_BaseEvent(self, evgen)
170
171
    def test_eventgenerator_BaseEvent_NodeId(self):
172
        evgen = self.opc.get_event_generator(ua.NodeId(ua.ObjectIds.BaseEventType))
173
        check_eventgenerator_BaseEvent(self, evgen)
174
175
    def test_eventgenerator_BaseEvent_ObjectIds(self):
176
        evgen = self.opc.get_event_generator(ua.ObjectIds.BaseEventType)
177
        check_eventgenerator_BaseEvent(self, evgen)
178
179
    def test_eventgenerator_BaseEvent_Identifier(self):
180
        evgen = self.opc.get_event_generator(2041)
181
        check_eventgenerator_BaseEvent(self, evgen)
182
183
    def test_eventgenerator_sourceServer_Node(self):
184
        pass
185
186
    def test_eventgenerator_sourceServer_NodeId(self):
187
        pass
188
189
    def test_eventgenerator_sourceServer_ObjectIds(self):
190
        pass
191
192
    def test_eventgenerator_CustomEvent_object(self):
193
        pass
194
195
    def test_eventgenerator_CustomEvent_Node(self):
196
        pass
197
198
    def test_eventgenerator_CustomEvent_NodeId(self):
199
        pass
200
201
    def test_eventgenerator_CustomEvent_ObjectIds(self):
202
        pass
203
204
    #def test_eventgenerator_InheritedEvent(self):
205
        #pass
206
207
208
209
    #def test_events_default(self):
210
        #msclt = MySubHandler()
211
        #sub = self.opc.create_subscription(100, msclt)
212
        #handle = sub.subscribe_events()
213
214
        #ev = EventGenerator(self.srv.iserver.isession)
215
        #msg = b"this is my msg "
216
        #ev.Message.Text = msg
217
        #tid = datetime.utcnow()
218
        #ev.Time = tid
219
        #ev.Severity = 500
220
        #ev.trigger()
221
222
        #ev = msclt.future.result()
223
        #self.assertIsNot(ev, None)  # we did not receive event
224
        #self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
225
        #self.assertEqual(ev.Message.Text, msg)
226
        ##self.assertEqual(msclt.ev.Time, tid)
227
        #self.assertEqual(ev.Severity, 500)
228
229
        ## time.sleep(0.1)
230
        #sub.unsubscribe(handle)
231
        #sub.delete()
232
233
234
def check_eventgenerator_BaseEvent(test, evgen):
235
    test.assertIsNot(evgen, None)  # we did not receive event generator
236
    test.assertIs(evgen.isession, test.opc.iserver.isession)
237
    check_base_event(test, evgen.event)
238
    test.assertEqual(evgen.event.SourceName, test.opc.get_server_node().get_display_name().Text)
239
    test.assertEqual(test.opc.get_server_node().get_attribute(ua.AttributeIds.EventNotifier).Value, ua.Variant(1, ua.VariantType.Byte))
240
241
242
def check_base_event(test, ev):
243
    test.assertIsNot(ev, None)  # we did not receive event
244
    test.assertIsInstance(ev, ua.BaseEvent)
245
    test.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
246
    test.assertEqual(ev.SourceNode, ua.NodeId(ua.ObjectIds.Server))
247
    test.assertEqual(ev.Severity, ua.Variant(1, ua.VariantType.UInt16))
248
    test.assertEqual(ev._freeze, True)
249