| Total Complexity | 69 |
| Total Lines | 563 |
| Duplicated Lines | 4.62 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.test_subscriptions often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import time |
||
| 2 | import pytest |
||
| 3 | from copy import copy |
||
| 4 | from asyncio import Future, sleep, wait_for, TimeoutError |
||
| 5 | from datetime import datetime, timedelta |
||
| 6 | |||
| 7 | import asyncua |
||
| 8 | from asyncua import ua |
||
| 9 | |||
| 10 | pytestmark = pytest.mark.asyncio |
||
| 11 | |||
| 12 | |||
| 13 | class SubHandler: |
||
| 14 | """ |
||
| 15 | Dummy subscription client |
||
| 16 | """ |
||
| 17 | |||
| 18 | def datachange_notification(self, node, val, data): |
||
| 19 | pass |
||
| 20 | |||
| 21 | def event_notification(self, event): |
||
| 22 | pass |
||
| 23 | |||
| 24 | |||
| 25 | class MySubHandler: |
||
| 26 | """ |
||
| 27 | More advanced subscription client using Future, so we can wait for events in tests |
||
| 28 | """ |
||
| 29 | |||
| 30 | def __init__(self): |
||
| 31 | self.future = Future() |
||
| 32 | |||
| 33 | def reset(self): |
||
| 34 | self.future = Future() |
||
| 35 | |||
| 36 | async def result(self): |
||
| 37 | return await wait_for(self.future, 2) |
||
| 38 | |||
| 39 | def datachange_notification(self, node, val, data): |
||
| 40 | self.future.set_result((node, val, data)) |
||
| 41 | |||
| 42 | def event_notification(self, event): |
||
| 43 | self.future.set_result(event) |
||
| 44 | |||
| 45 | |||
| 46 | class MySubHandler2: |
||
| 47 | def __init__(self): |
||
| 48 | self.results = [] |
||
| 49 | |||
| 50 | def datachange_notification(self, node, val, data): |
||
| 51 | self.results.append((node, val)) |
||
| 52 | |||
| 53 | def event_notification(self, event): |
||
| 54 | self.results.append(event) |
||
| 55 | |||
| 56 | |||
| 57 | class MySubHandlerCounter: |
||
| 58 | def __init__(self): |
||
| 59 | self.datachange_count = 0 |
||
| 60 | self.event_count = 0 |
||
| 61 | |||
| 62 | def datachange_notification(self, node, val, data): |
||
| 63 | self.datachange_count += 1 |
||
| 64 | |||
| 65 | def event_notification(self, event): |
||
| 66 | self.event_count += 1 |
||
| 67 | |||
| 68 | |||
| 69 | async def test_subscription_failure(opc): |
||
| 70 | myhandler = MySubHandler() |
||
| 71 | o = opc.opc.get_objects_node() |
||
| 72 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 73 | with pytest.raises(ua.UaStatusCodeError): |
||
| 74 | # we can only subscribe to variables so this should fail |
||
| 75 | handle1 = await sub.subscribe_data_change(o) |
||
| 76 | await sub.delete() |
||
| 77 | |||
| 78 | |||
| 79 | async def test_subscription_overload(opc): |
||
| 80 | nb = 10 |
||
| 81 | myhandler = SubHandler() |
||
| 82 | o = opc.opc.get_objects_node() |
||
| 83 | sub = await opc.opc.create_subscription(1, myhandler) |
||
| 84 | variables = [] |
||
| 85 | subs = [] |
||
| 86 | for i in range(nb): |
||
| 87 | v = await o.add_variable(3, f'SubscriptionVariableOverload{i}', 99) |
||
| 88 | variables.append(v) |
||
| 89 | for i in range(nb): |
||
| 90 | await sub.subscribe_data_change(variables) |
||
| 91 | for i in range(nb): |
||
| 92 | for j in range(nb): |
||
| 93 | await variables[i].set_value(j) |
||
| 94 | s = await opc.opc.create_subscription(1, myhandler) |
||
| 95 | await s.subscribe_data_change(variables) |
||
| 96 | subs.append(s) |
||
| 97 | await sub.subscribe_data_change(variables[i]) |
||
| 98 | for i in range(nb): |
||
| 99 | for j in range(nb): |
||
| 100 | await variables[i].set_value(j) |
||
| 101 | await sub.delete() |
||
| 102 | for s in subs: |
||
| 103 | await s.delete() |
||
| 104 | |||
| 105 | |||
| 106 | async def test_subscription_count(opc): |
||
| 107 | myhandler = MySubHandlerCounter() |
||
| 108 | sub = await opc.opc.create_subscription(1, myhandler) |
||
| 109 | o = opc.opc.get_objects_node() |
||
| 110 | var = await o.add_variable(3, 'SubVarCounter', 0.1) |
||
| 111 | await sub.subscribe_data_change(var) |
||
| 112 | nb = 12 |
||
| 113 | for i in range(nb): |
||
| 114 | val = await var.get_value() |
||
| 115 | await var.set_value(val + 1) |
||
| 116 | await sleep(0.2) # let last event arrive |
||
| 117 | assert nb + 1 == myhandler.datachange_count |
||
| 118 | await sub.delete() |
||
| 119 | |||
| 120 | |||
| 121 | async def test_subscription_count_list(opc): |
||
| 122 | myhandler = MySubHandlerCounter() |
||
| 123 | sub = await opc.opc.create_subscription(1, myhandler) |
||
| 124 | o = opc.opc.get_objects_node() |
||
| 125 | var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2]) |
||
| 126 | await sub.subscribe_data_change(var) |
||
| 127 | nb = 12 |
||
| 128 | for i in range(nb): |
||
| 129 | val = await var.get_value() |
||
| 130 | # we do not want to modify object in our db, we need a copy in order to generate event |
||
| 131 | val = copy(val) |
||
| 132 | val.append(i) |
||
| 133 | await var.set_value(copy(val)) |
||
| 134 | await sleep(0.2) # let last event arrive |
||
| 135 | assert nb + 1 == myhandler.datachange_count |
||
| 136 | await sub.delete() |
||
| 137 | |||
| 138 | |||
| 139 | async def test_subscription_count_no_change(opc): |
||
| 140 | myhandler = MySubHandlerCounter() |
||
| 141 | sub = await opc.opc.create_subscription(1, myhandler) |
||
| 142 | o = opc.opc.get_objects_node() |
||
| 143 | var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2]) |
||
| 144 | await sub.subscribe_data_change(var) |
||
| 145 | nb = 12 |
||
| 146 | for i in range(nb): |
||
| 147 | val = await var.get_value() |
||
| 148 | await var.set_value(val) |
||
| 149 | await sleep(0.2) # let last event arrive |
||
| 150 | assert 1 == myhandler.datachange_count |
||
| 151 | await sub.delete() |
||
| 152 | |||
| 153 | |||
| 154 | async def test_subscription_count_empty(opc): |
||
| 155 | myhandler = MySubHandlerCounter() |
||
| 156 | sub = await opc.opc.create_subscription(1, myhandler) |
||
| 157 | o = opc.opc.get_objects_node() |
||
| 158 | var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3]) |
||
| 159 | await sub.subscribe_data_change(var) |
||
| 160 | while True: |
||
| 161 | val = await var.get_value() |
||
| 162 | # we do not want to modify object in our db, we need a copy in order to generate event |
||
| 163 | val = copy(val) |
||
| 164 | val.pop() |
||
| 165 | await var.set_value(val, ua.VariantType.Double) |
||
| 166 | if not val: |
||
| 167 | break |
||
| 168 | await sleep(0.2) # let last event arrive |
||
| 169 | assert 4 == myhandler.datachange_count |
||
| 170 | await sub.delete() |
||
| 171 | |||
| 172 | |||
| 173 | async def test_subscription_overload_simple(opc): |
||
| 174 | nb = 10 |
||
| 175 | myhandler = MySubHandler() |
||
| 176 | o = opc.opc.get_objects_node() |
||
| 177 | sub = await opc.opc.create_subscription(1, myhandler) |
||
| 178 | variables = [] |
||
| 179 | for i in range(nb): |
||
| 180 | variables.append(await o.add_variable(3, f'SubVarOverload{i}', i)) |
||
| 181 | for i in range(nb): |
||
| 182 | await sub.subscribe_data_change(variables) |
||
| 183 | await sub.delete() |
||
| 184 | |||
| 185 | |||
| 186 | async def test_subscription_data_change(opc): |
||
| 187 | """ |
||
| 188 | test subscriptions. This is far too complicated for |
||
| 189 | a unittest but, setting up subscriptions requires a lot |
||
| 190 | of code, so when we first set it up, it is best |
||
| 191 | to test as many things as possible |
||
| 192 | """ |
||
| 193 | myhandler = MySubHandler() |
||
| 194 | o = opc.opc.get_objects_node() |
||
| 195 | # subscribe to a variable |
||
| 196 | startv1 = [1, 2, 3] |
||
| 197 | v1 = await o.add_variable(3, 'SubscriptionVariableV1', startv1) |
||
| 198 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 199 | handle1 = await sub.subscribe_data_change(v1) |
||
| 200 | # Now check we get the start value |
||
| 201 | node, val, data = await myhandler.result() |
||
| 202 | assert startv1 == val |
||
| 203 | assert v1 == node |
||
| 204 | myhandler.reset() # reset future object |
||
| 205 | # modify v1 and check we get value |
||
| 206 | await v1.set_value([5]) |
||
| 207 | node, val, data = await myhandler.result() |
||
| 208 | assert v1 == node |
||
| 209 | assert [5] == val |
||
| 210 | with pytest.raises(ua.UaStatusCodeError): |
||
| 211 | await sub.unsubscribe(999) # non existing handle |
||
| 212 | await sub.unsubscribe(handle1) |
||
| 213 | with pytest.raises(ua.UaStatusCodeError): |
||
| 214 | await sub.unsubscribe(handle1) # second try should fail |
||
| 215 | await sub.delete() |
||
| 216 | with pytest.raises(ua.UaStatusCodeError): |
||
| 217 | await sub.unsubscribe(handle1) # sub does not exist anymore |
||
| 218 | |||
| 219 | |||
| 220 | async def test_subscription_data_change_bool(opc): |
||
| 221 | """ |
||
| 222 | test subscriptions. This is far too complicated for |
||
| 223 | a unittest but, setting up subscriptions requires a lot |
||
| 224 | of code, so when we first set it up, it is best |
||
| 225 | to test as many things as possible |
||
| 226 | """ |
||
| 227 | myhandler = MySubHandler() |
||
| 228 | o = opc.opc.get_objects_node() |
||
| 229 | # subscribe to a variable |
||
| 230 | startv1 = True |
||
| 231 | v1 = await o.add_variable(3, 'SubscriptionVariableBool', startv1) |
||
| 232 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 233 | handle1 = await sub.subscribe_data_change(v1) |
||
| 234 | # Now check we get the start value |
||
| 235 | node, val, data = await myhandler.result() |
||
| 236 | assert startv1 == val |
||
| 237 | assert v1 == node |
||
| 238 | myhandler.reset() # reset future object |
||
| 239 | # modify v1 and check we get value |
||
| 240 | await v1.set_value(False) |
||
| 241 | node, val, data = await myhandler.result() |
||
| 242 | assert v1 == node |
||
| 243 | assert val is False |
||
| 244 | await sub.delete() # should delete our monitoreditem too |
||
| 245 | |||
| 246 | |||
| 247 | async def test_subscription_data_change_many(opc): |
||
| 248 | """ |
||
| 249 | test subscriptions. This is far too complicated for |
||
| 250 | a unittest but, setting up subscriptions requires a lot |
||
| 251 | of code, so when we first set it up, it is best |
||
| 252 | to test as many things as possible |
||
| 253 | """ |
||
| 254 | myhandler = MySubHandler2() |
||
| 255 | o = opc.opc.get_objects_node() |
||
| 256 | startv1 = True |
||
| 257 | v1 = await o.add_variable(3, 'SubscriptionVariableMany1', startv1) |
||
| 258 | startv2 = [1.22, 1.65] |
||
| 259 | v2 = await o.add_variable(3, 'SubscriptionVariableMany2', startv2) |
||
| 260 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 261 | handle1, handle2 = await sub.subscribe_data_change([v1, v2]) |
||
| 262 | # Now check we get the start values |
||
| 263 | nodes = [v1, v2] |
||
| 264 | count = 0 |
||
| 265 | while not len(myhandler.results) > 1: |
||
| 266 | count += 1 |
||
| 267 | await sleep(0.1) |
||
| 268 | if count > 100: |
||
| 269 | raise RuntimeError("Did not get result from subscription") |
||
| 270 | for node, val in myhandler.results: |
||
| 271 | assert node in nodes |
||
| 272 | nodes.remove(node) |
||
| 273 | if node == v1: |
||
| 274 | assert val == startv1 |
||
| 275 | elif node == v2: |
||
| 276 | assert val == startv2 |
||
| 277 | else: |
||
| 278 | raise RuntimeError("Error node {0} is neither {1} nor {2}".format(node, v1, v2)) |
||
| 279 | await sub.delete() |
||
| 280 | |||
| 281 | |||
| 282 | async def test_subscribe_server_time(opc): |
||
| 283 | myhandler = MySubHandler() |
||
| 284 | server_time_node = opc.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime)) |
||
| 285 | sub = await opc.opc.create_subscription(200, myhandler) |
||
| 286 | handle = await sub.subscribe_data_change(server_time_node) |
||
| 287 | node, val, data = await myhandler.result() |
||
| 288 | assert server_time_node == node |
||
| 289 | delta = datetime.utcnow() - val |
||
| 290 | assert delta < timedelta(seconds=2) |
||
| 291 | await sub.unsubscribe(handle) |
||
| 292 | await sub.delete() |
||
| 293 | |||
| 294 | |||
| 295 | async def test_create_delete_subscription(opc): |
||
| 296 | o = opc.opc.get_objects_node() |
||
| 297 | v = await o.add_variable(3, 'SubscriptionVariable', [1, 2, 3]) |
||
| 298 | sub = await opc.opc.create_subscription(100, MySubHandler()) |
||
| 299 | handle = await sub.subscribe_data_change(v) |
||
| 300 | await sleep(0.1) |
||
| 301 | await sub.unsubscribe(handle) |
||
| 302 | await sub.delete() |
||
| 303 | |||
| 304 | |||
| 305 | async def test_subscribe_events(opc): |
||
| 306 | sub = await opc.opc.create_subscription(100, MySubHandler()) |
||
| 307 | handle = await sub.subscribe_events() |
||
| 308 | await sleep(0.1) |
||
| 309 | await sub.unsubscribe(handle) |
||
| 310 | await sub.delete() |
||
| 311 | |||
| 312 | |||
| 313 | async def test_subscribe_events_to_wrong_node(opc): |
||
| 314 | sub = await opc.opc.create_subscription(100, MySubHandler()) |
||
| 315 | with pytest.raises(ua.UaStatusCodeError): |
||
| 316 | handle = await sub.subscribe_events(opc.opc.get_node("i=85")) |
||
| 317 | o = opc.opc.get_objects_node() |
||
| 318 | v = await o.add_variable(3, 'VariableNoEventNofierAttribute', 4) |
||
| 319 | with pytest.raises(ua.UaStatusCodeError): |
||
| 320 | handle = await sub.subscribe_events(v) |
||
| 321 | await sub.delete() |
||
| 322 | |||
| 323 | |||
| 324 | async def test_get_event_from_type_node_BaseEvent(opc): |
||
| 325 | etype = opc.opc.get_node(ua.ObjectIds.BaseEventType) |
||
| 326 | properties = await asyncua.common.events.get_event_properties_from_type_node(etype) |
||
| 327 | for child in await etype.get_properties(): |
||
| 328 | assert child in properties |
||
| 329 | |||
| 330 | |||
| 331 | async def test_get_event_from_type_node_CustomEvent(opc): |
||
| 332 | etype = await opc.server.create_custom_event_type( |
||
| 333 | 2, 'MyEvent', ua.ObjectIds.AuditEventType, |
||
| 334 | [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)] |
||
| 335 | ) |
||
| 336 | properties = await asyncua.common.events.get_event_properties_from_type_node(etype) |
||
| 337 | for child in await opc.opc.get_node(ua.ObjectIds.BaseEventType).get_properties(): |
||
| 338 | assert child in properties |
||
| 339 | for child in await opc.opc.get_node(ua.ObjectIds.AuditEventType).get_properties(): |
||
| 340 | assert child in properties |
||
| 341 | for child in await opc.opc.get_node(etype.nodeid).get_properties(): |
||
| 342 | assert child in properties |
||
| 343 | assert await etype.get_child("2:PropertyNum") in properties |
||
| 344 | assert await etype.get_child("2:PropertyString") in properties |
||
| 345 | |||
| 346 | |||
| 347 | async def test_events_default(opc): |
||
| 348 | evgen = await opc.server.get_event_generator() |
||
| 349 | myhandler = MySubHandler() |
||
| 350 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 351 | handle = await sub.subscribe_events() |
||
| 352 | tid = datetime.utcnow() |
||
| 353 | msg = "this is my msg " |
||
| 354 | evgen.trigger(tid, msg) |
||
| 355 | ev = await myhandler.result() |
||
| 356 | assert ev is not None # we did not receive event |
||
| 357 | assert ua.NodeId(ua.ObjectIds.BaseEventType) == ev.EventType |
||
| 358 | assert 1 == ev.Severity |
||
| 359 | assert (await opc.opc.get_server_node().get_browse_name()).Name == ev.SourceName |
||
| 360 | assert opc.opc.get_server_node().nodeid == ev.SourceNode |
||
| 361 | assert msg == ev.Message.Text |
||
| 362 | assert tid == ev.Time |
||
| 363 | await sub.unsubscribe(handle) |
||
| 364 | await sub.delete() |
||
| 365 | |||
| 366 | |||
| 367 | async def test_events_MyObject(opc): |
||
| 368 | objects = opc.server.get_objects_node() |
||
| 369 | o = await objects.add_object(3, 'MyObject') |
||
| 370 | evgen = await opc.server.get_event_generator(emitting_node=o) |
||
| 371 | myhandler = MySubHandler() |
||
| 372 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 373 | handle = await sub.subscribe_events(o) |
||
| 374 | tid = datetime.utcnow() |
||
| 375 | msg = "this is my msg " |
||
| 376 | evgen.trigger(tid, msg) |
||
| 377 | ev = await myhandler.result() |
||
| 378 | assert ev is not None # we did not receive event |
||
| 379 | assert ua.NodeId(ua.ObjectIds.BaseEventType) == ev.EventType |
||
| 380 | assert 1 == ev.Severity |
||
| 381 | assert 'MyObject' == ev.SourceName |
||
| 382 | assert o.nodeid == ev.SourceNode |
||
| 383 | assert msg == ev.Message.Text |
||
| 384 | assert tid == ev.Time |
||
| 385 | await sub.unsubscribe(handle) |
||
| 386 | await sub.delete() |
||
| 387 | |||
| 388 | |||
| 389 | async def test_events_wrong_source(opc): |
||
| 390 | objects = opc.server.get_objects_node() |
||
| 391 | o = await objects.add_object(3, 'MyObject') |
||
| 392 | evgen = await opc.server.get_event_generator(emitting_node=o) |
||
| 393 | myhandler = MySubHandler() |
||
| 394 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 395 | handle = await sub.subscribe_events() |
||
| 396 | tid = datetime.utcnow() |
||
| 397 | msg = "this is my msg " |
||
| 398 | evgen.trigger(tid, msg) |
||
| 399 | with pytest.raises(TimeoutError): # we should not receive event |
||
| 400 | ev = await myhandler.result() |
||
| 401 | await sub.unsubscribe(handle) |
||
| 402 | await sub.delete() |
||
| 403 | |||
| 404 | |||
| 405 | async def test_events_CustomEvent(opc): |
||
| 406 | etype = await opc.server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, |
||
| 407 | [('PropertyNum', ua.VariantType.Float), |
||
| 408 | ('PropertyString', ua.VariantType.String)]) |
||
| 409 | evgen = await opc.server.get_event_generator(etype) |
||
| 410 | myhandler = MySubHandler() |
||
| 411 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 412 | handle = await sub.subscribe_events(evtypes=etype) |
||
| 413 | propertynum = 2 |
||
| 414 | propertystring = "This is my test" |
||
| 415 | evgen.event.PropertyNum = propertynum |
||
| 416 | evgen.event.PropertyString = propertystring |
||
| 417 | serverity = 500 |
||
| 418 | evgen.event.Severity = serverity |
||
| 419 | tid = datetime.utcnow() |
||
| 420 | msg = "this is my msg " |
||
| 421 | evgen.trigger(tid, msg) |
||
| 422 | ev = await myhandler.result() |
||
| 423 | assert ev is not None # we did not receive event |
||
| 424 | assert etype.nodeid == ev.EventType |
||
| 425 | assert serverity == ev.Severity |
||
| 426 | assert (await opc.opc.get_server_node().get_browse_name()).Name == ev.SourceName |
||
| 427 | assert opc.opc.get_server_node().nodeid == ev.SourceNode |
||
| 428 | assert msg == ev.Message.Text |
||
| 429 | assert tid == ev.Time |
||
| 430 | assert propertynum == ev.PropertyNum |
||
| 431 | assert propertystring == ev.PropertyString |
||
| 432 | await sub.unsubscribe(handle) |
||
| 433 | await sub.delete() |
||
| 434 | |||
| 435 | |||
| 436 | async def test_events_CustomEvent_MyObject(opc): |
||
| 437 | objects = opc.server.get_objects_node() |
||
| 438 | o = await objects.add_object(3, 'MyObject') |
||
| 439 | etype = await opc.server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, |
||
| 440 | [('PropertyNum', ua.VariantType.Float), |
||
| 441 | ('PropertyString', ua.VariantType.String)]) |
||
| 442 | evgen = await opc.server.get_event_generator(etype, emitting_node=o) |
||
| 443 | myhandler = MySubHandler() |
||
| 444 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 445 | handle = await sub.subscribe_events(o, etype) |
||
| 446 | propertynum = 2 |
||
| 447 | propertystring = "This is my test" |
||
| 448 | evgen.event.PropertyNum = propertynum |
||
| 449 | evgen.event.PropertyString = propertystring |
||
| 450 | tid = datetime.utcnow() |
||
| 451 | msg = "this is my msg " |
||
| 452 | evgen.trigger(tid, msg) |
||
| 453 | ev = await myhandler.result() |
||
| 454 | assert ev is not None # we did not receive event |
||
| 455 | assert etype.nodeid == ev.EventType |
||
| 456 | assert 1 == ev.Severity |
||
| 457 | assert 'MyObject' == ev.SourceName |
||
| 458 | assert o.nodeid == ev.SourceNode |
||
| 459 | assert msg == ev.Message.Text |
||
| 460 | assert tid == ev.Time |
||
| 461 | assert propertynum == ev.PropertyNum |
||
| 462 | assert propertystring == ev.PropertyString |
||
| 463 | await sub.unsubscribe(handle) |
||
| 464 | await sub.delete() |
||
| 465 | |||
| 466 | |||
| 467 | async def test_several_different_events(opc): |
||
| 468 | objects = opc.server.get_objects_node() |
||
| 469 | o = await objects.add_object(3, 'MyObject') |
||
| 470 | etype1 = await opc.server.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, |
||
| 471 | [('PropertyNum', ua.VariantType.Float), |
||
| 472 | ('PropertyString', ua.VariantType.String)]) |
||
| 473 | evgen1 = await opc.server.get_event_generator(etype1, o) |
||
| 474 | etype2 = await opc.server.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, |
||
| 475 | [('PropertyNum', ua.VariantType.Float), |
||
| 476 | ('PropertyString', ua.VariantType.String)]) |
||
| 477 | evgen2 = await opc.server.get_event_generator(etype2, o) |
||
| 478 | myhandler = MySubHandler2() |
||
| 479 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 480 | handle = await sub.subscribe_events(o, etype1) |
||
| 481 | propertynum1 = 1 |
||
| 482 | propertystring1 = "This is my test 1" |
||
| 483 | evgen1.event.PropertyNum = propertynum1 |
||
| 484 | evgen1.event.PropertyString = propertystring1 |
||
| 485 | propertynum2 = 2 |
||
| 486 | propertystring2 = "This is my test 2" |
||
| 487 | evgen2.event.PropertyNum = propertynum2 |
||
| 488 | evgen2.event.PropertyString = propertystring2 |
||
| 489 | for i in range(3): |
||
| 490 | evgen1.trigger() |
||
| 491 | evgen2.trigger() |
||
| 492 | await sleep(1) # ToDo: replace |
||
| 493 | assert 3 == len(myhandler.results) |
||
| 494 | ev = myhandler.results[-1] |
||
| 495 | assert etype1.nodeid == ev.EventType |
||
| 496 | handle = await sub.subscribe_events(o, etype2) |
||
| 497 | for i in range(4): |
||
| 498 | evgen1.trigger() |
||
| 499 | evgen2.trigger() |
||
| 500 | await sleep(1) # ToDo: replace |
||
| 501 | ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid] |
||
| 502 | ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid] |
||
| 503 | assert 11 == len(myhandler.results) |
||
| 504 | assert 4 == len(ev2s) |
||
| 505 | assert 7 == len(ev1s) |
||
| 506 | await sub.unsubscribe(handle) |
||
| 507 | await sub.delete() |
||
| 508 | |||
| 509 | |||
| 510 | async def test_several_different_events_2(opc): |
||
| 511 | objects = opc.server.get_objects_node() |
||
| 512 | o = await objects.add_object(3, 'MyObject') |
||
| 513 | etype1 = await opc.server.create_custom_event_type( |
||
| 514 | 2, 'MyEvent1', ua.ObjectIds.BaseEventType, |
||
| 515 | [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)] |
||
| 516 | ) |
||
| 517 | evgen1 = await opc.server.get_event_generator(etype1, o) |
||
| 518 | etype2 = await opc.server.create_custom_event_type( |
||
| 519 | 2, 'MyEvent2', ua.ObjectIds.BaseEventType, |
||
| 520 | [('PropertyNum2', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)] |
||
| 521 | ) |
||
| 522 | evgen2 = await opc.server.get_event_generator(etype2, o) |
||
| 523 | etype3 = await opc.server.create_custom_event_type( |
||
| 524 | 2, 'MyEvent3', ua.ObjectIds.BaseEventType, |
||
| 525 | [('PropertyNum3', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)] |
||
| 526 | ) |
||
| 527 | evgen3 = await opc.server.get_event_generator(etype3, o) |
||
| 528 | myhandler = MySubHandler2() |
||
| 529 | sub = await opc.opc.create_subscription(100, myhandler) |
||
| 530 | handle = await sub.subscribe_events(o, [etype1, etype3]) |
||
| 531 | propertynum1 = 1 |
||
| 532 | propertystring1 = "This is my test 1" |
||
| 533 | evgen1.event.PropertyNum = propertynum1 |
||
| 534 | evgen1.event.PropertyString = propertystring1 |
||
| 535 | propertynum2 = 2 |
||
| 536 | propertystring2 = "This is my test 2" |
||
| 537 | evgen2.event.PropertyNum2 = propertynum2 |
||
| 538 | evgen2.event.PropertyString = propertystring2 |
||
| 539 | propertynum3 = 3 |
||
| 540 | propertystring3 = "This is my test 3" |
||
| 541 | evgen3.event.PropertyNum3 = propertynum3 |
||
| 542 | evgen3.event.PropertyString = propertystring2 |
||
| 543 | for i in range(3): |
||
| 544 | evgen1.trigger() |
||
| 545 | evgen2.trigger() |
||
| 546 | evgen3.trigger() |
||
| 547 | evgen3.event.PropertyNum3 = 9999 |
||
| 548 | evgen3.trigger() |
||
| 549 | await sleep(1) |
||
| 550 | ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid] |
||
| 551 | ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid] |
||
| 552 | ev3s = [ev for ev in myhandler.results if ev.EventType == etype3.nodeid] |
||
| 553 | assert 7 == len(myhandler.results) |
||
| 554 | assert 3 == len(ev1s) |
||
| 555 | assert 0 == len(ev2s) |
||
| 556 | assert 4 == len(ev3s) |
||
| 557 | assert propertynum1 == ev1s[0].PropertyNum |
||
| 558 | assert propertynum3 == ev3s[0].PropertyNum3 |
||
| 559 | assert 9999 == ev3s[-1].PropertyNum3 |
||
| 560 | assert ev1s[0].PropertyNum3 is None |
||
| 561 | await sub.unsubscribe(handle) |
||
| 562 | await sub.delete() |
||
| 563 |