@@ 144-158 (lines=15) @@ | ||
141 | self.assertEqual(myhandler.datachange_count, 1) |
|
142 | sub.delete() |
|
143 | ||
144 | def test_subscription_count_empty(self): |
|
145 | myhandler = MySubHandlerCounter() |
|
146 | sub = self.opc.create_subscription(1, myhandler) |
|
147 | o = self.opc.get_objects_node() |
|
148 | var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3]) |
|
149 | sub.subscribe_data_change(var) |
|
150 | while True: |
|
151 | val = var.get_value() |
|
152 | val.pop() |
|
153 | var.set_value(val, ua.VariantType.Double) |
|
154 | if not val: |
|
155 | break |
|
156 | time.sleep(0.2) # let last event arrive |
|
157 | self.assertEqual(myhandler.datachange_count, 4) |
|
158 | sub.delete() |
|
159 | ||
160 | def test_subscription_overload_simple(self): |
|
161 | nb = 10 |
|
@@ 115-128 (lines=14) @@ | ||
112 | self.assertEqual(myhandler.datachange_count, nb + 1) |
|
113 | sub.delete() |
|
114 | ||
115 | def test_subscription_count_list(self): |
|
116 | myhandler = MySubHandlerCounter() |
|
117 | sub = self.opc.create_subscription(1, myhandler) |
|
118 | o = self.opc.get_objects_node() |
|
119 | var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2]) |
|
120 | sub.subscribe_data_change(var) |
|
121 | nb = 12 |
|
122 | for i in range(nb): |
|
123 | val = var.get_value() |
|
124 | val.append(i) |
|
125 | var.set_value(val) |
|
126 | time.sleep(0.2) # let last event arrive |
|
127 | self.assertEqual(myhandler.datachange_count, nb + 1) |
|
128 | sub.delete() |
|
129 | ||
130 | def test_subscription_count_no_change(self): |
|
131 | myhandler = MySubHandlerCounter() |