Code Duplication    Length = 32-32 lines in 2 locations

tests/tests_subscriptions.py 2 locations

@@ 450-481 (lines=32) @@
447
        sub.unsubscribe(handle)
448
        sub.delete()
449
450
    def test_events_CustomEvent_MyObject(self):
451
        objects = self.srv.get_objects_node()
452
        o = objects.add_object(3, 'MyObject')
453
        etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
454
        evgen = self.srv.get_event_generator(etype, o)
455
456
        myhandler = MySubHandler()
457
        sub = self.opc.create_subscription(100, myhandler)
458
        handle = sub.subscribe_events(o, etype)
459
460
        propertynum = 2
461
        propertystring = "This is my test"
462
        evgen.event.PropertyNum = propertynum
463
        evgen.event.PropertyString = propertystring
464
        tid = datetime.utcnow()
465
        msg = b"this is my msg "
466
        evgen.trigger(tid, msg)
467
468
        ev = myhandler.future.result(10)
469
        self.assertIsNot(ev, None)  # we did not receive event
470
        self.assertEqual(ev.EventType, etype.nodeid)
471
        self.assertEqual(ev.Severity, 1)
472
        self.assertEqual(ev.SourceName, 'MyObject')
473
        self.assertEqual(ev.SourceNode, o.nodeid)
474
        self.assertEqual(ev.Message.Text, msg)
475
        self.assertEqual(ev.Time, tid)
476
        self.assertEqual(ev.PropertyNum, propertynum)
477
        self.assertEqual(ev.PropertyString, propertystring)
478
479
        # time.sleep(0.1)
480
        sub.unsubscribe(handle)
481
        sub.delete()
482
483
    def test_several_different_events(self):
484
        objects = self.srv.get_objects_node()
@@ 417-448 (lines=32) @@
414
        sub.unsubscribe(handle)
415
        sub.delete()
416
417
    def test_events_CustomEvent(self):
418
        etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
419
        evgen = self.srv.get_event_generator(etype)
420
421
        myhandler = MySubHandler()
422
        sub = self.opc.create_subscription(100, myhandler)
423
        handle = sub.subscribe_events(evtype=etype)
424
425
        propertynum = 2
426
        propertystring = "This is my test"
427
        evgen.event.PropertyNum = propertynum
428
        evgen.event.PropertyString = propertystring
429
        serverity = 500
430
        evgen.event.Severity = serverity
431
        tid = datetime.utcnow()
432
        msg = b"this is my msg "
433
        evgen.trigger(tid, msg)
434
435
        ev = myhandler.future.result(10)
436
        self.assertIsNot(ev, None)  # we did not receive event
437
        self.assertEqual(ev.EventType, etype.nodeid)
438
        self.assertEqual(ev.Severity, serverity)
439
        self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
440
        self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
441
        self.assertEqual(ev.Message.Text, msg)
442
        self.assertEqual(ev.Time, tid)
443
        self.assertEqual(ev.PropertyNum, propertynum)
444
        self.assertEqual(ev.PropertyString, propertystring)
445
446
        # time.sleep(0.1)
447
        sub.unsubscribe(handle)
448
        sub.delete()
449
450
    def test_events_CustomEvent_MyObject(self):
451
        objects = self.srv.get_objects_node()