|
@@ 144-158 (lines=15) @@
|
| 141 |
|
self.assertEqual(myhandler.datachange_count, 1) |
| 142 |
|
sub.delete() |
| 143 |
|
|
| 144 |
|
def test_subscription_count_empty(self): |
| 145 |
|
myhandler = MySubHandlerCounter() |
| 146 |
|
sub = self.opc.create_subscription(1, myhandler) |
| 147 |
|
o = self.opc.get_objects_node() |
| 148 |
|
var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3]) |
| 149 |
|
sub.subscribe_data_change(var) |
| 150 |
|
while True: |
| 151 |
|
val = var.get_value() |
| 152 |
|
val.pop() |
| 153 |
|
var.set_value(val, ua.VariantType.Double) |
| 154 |
|
if not val: |
| 155 |
|
break |
| 156 |
|
time.sleep(0.2) # let last event arrive |
| 157 |
|
self.assertEqual(myhandler.datachange_count, 4) |
| 158 |
|
sub.delete() |
| 159 |
|
|
| 160 |
|
def test_subscription_overload_simple(self): |
| 161 |
|
nb = 10 |
|
@@ 115-128 (lines=14) @@
|
| 112 |
|
self.assertEqual(myhandler.datachange_count, nb + 1) |
| 113 |
|
sub.delete() |
| 114 |
|
|
| 115 |
|
def test_subscription_count_list(self): |
| 116 |
|
myhandler = MySubHandlerCounter() |
| 117 |
|
sub = self.opc.create_subscription(1, myhandler) |
| 118 |
|
o = self.opc.get_objects_node() |
| 119 |
|
var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2]) |
| 120 |
|
sub.subscribe_data_change(var) |
| 121 |
|
nb = 12 |
| 122 |
|
for i in range(nb): |
| 123 |
|
val = var.get_value() |
| 124 |
|
val.append(i) |
| 125 |
|
var.set_value(val) |
| 126 |
|
time.sleep(0.2) # let last event arrive |
| 127 |
|
self.assertEqual(myhandler.datachange_count, nb + 1) |
| 128 |
|
sub.delete() |
| 129 |
|
|
| 130 |
|
def test_subscription_count_no_change(self): |
| 131 |
|
myhandler = MySubHandlerCounter() |