Passed
Pull Request — master (#348)
by
unknown
03:16
created

ProgramStateMachineTypeClass.install()   A

Complexity

Conditions 1

Size

Total Lines 25
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 21
nop 2
dl 0
loc 25
rs 9.376
c 0
b 0
f 0
1
import asyncio, logging
2
3
from asyncua import Server, ua, Node
4
from asyncua.common.instantiate_util import instantiate
5
6
class StateMachineTypeClass(object):
7
    '''
8
    Implementation of an StateMachineType
9
    '''
10
    def __init__(self, server=None, parent=None, idx=None, name=None):
11
        if not server: raise ValueError #change to check instance type
12
        if not parent: raise ValueError #change to check instance type
13
        if idx == None:
14
            idx = parent.nodeid.NamespaceIndex
15
        if name == None:
16
            name = "StateMachine"
17
        self._server = server
18
        self._parent = parent
19
        self._state_machine_node = None
20
        self._state_machine_type = ua.NodeId(2299, 0)
21
        self._name = name
22
        self._idx = idx
23
        self._optionals = False
24
        self._current_state_node = None
25
        self._current_state_id_node = None
26
        self._last_transition_node = None
27
        self._last_transition_node_id = None
28
29
    async def install(self, optionals=False):
30
        '''
31
        setup adressspace and initialize 
32
        '''
33
        self._optionals = optionals
34
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
35
        self._current_state_node = await self._state_machine_node.get_child(["CurrentState"])
36
        self._current_state_id_node = await self._state_machine_node.get_child(["CurrentState","Id"])
37
        if self._optionals:
38
            self._current_transition_node = await self._state_machine_node.get_child(["LastTransition"])
39
            self._current_transition_id_node = await self._state_machine_node.get_child(["LastTransition","Id"])
40
        
41
        #initialise values
42
        #maybe its smart to check parents child for a initial state instance (InitialStateType) and initialize it with its id but if no state instance is brovided ...
43
        
44
    async def change_state(self, state_name, state_node, transition_name=None, transition_node=None):
45
        '''
46
        state_name: ua.LocalizedText()
47
        state: ua.NodeId() <- StateType node
48
        transition_name: ua.LocalizedText()
49
        transition: ua.NodeId() <- TransitionType node
50
        '''
51
        #check types/class
52
        #check StateType exist
53
        #check TransitionTypeType exist
54
        await self.write_state(state_name, state_node)
55
        if self._optionals and transition_name and transition_node:
56
            await self.write_transition(transition_name, transition_node)
57
58
    async def write_state(self, state_name, state_node):
59
        #check types/class
60
        await self._current_state_node.write_value(state_name)
61
        await self._current_state_id_node.write_value(state_node)
62
63
    async def write_transition(self, transition_name, transition_node):
64
        #check types/class
65
        await self._last_transition_node.write_value(transition_name)
66
        await self._last_transition_id_node.write_value(transition_node)
67
68
class FiniteStateMachineTypeClass(StateMachineTypeClass):
69
    '''
70
    Implementation of an FiniteStateMachineType
71
    '''
72
    def __init__(self, server=None, parent=None, idx=None, name=None):
73
        super().__init__(server, parent, idx, name)
74
        if name == None:
75
            name = "FiniteStateMachine"
76
        self._state_machine_type = ua.NodeId(2771, 0)
77
        self._avalible_states = []
78
        self._avalible_transitions = []
79
80
    async def install(self, optionals=False):
81
        '''
82
        setup adressspace and initialize 
83
        '''
84
        self._optionals = optionals
85
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
86
87
    async def set_avalible_states(self, states):
88
        self._avalible_states = states
89
90
    async def set_avalible_transitions(self, transitions):
91
        self._avalible_transitions = transitions      
92
93
class ExclusiveLimitStateMachineTypeClass(FiniteStateMachineTypeClass):
94
    '''
95
    NOT IMPLEMENTED "ExclusiveLimitStateMachineType"
96
    '''
97
    def __init__(self, server=None, parent=None, idx=None, name=None):
98
        super().__init__(server, parent)
99
        if name == None:
100
            name = "ExclusiveLimitStateMachine"
101
        self._state_machine_type = ua.NodeId(9318, 0)
102
        raise NotImplementedError
103
104
class FileTransferStateMachineTypeClass(FiniteStateMachineTypeClass):
105
    '''
106
    NOT IMPLEMENTED "FileTransferStateMachineType"
107
    '''
108
    def __init__(self, server=None, parent=None, idx=None, name=None):
109
        super().__init__(server, parent)
110
        if name == None:
111
            name = "FileTransferStateMachine"
112
        self._state_machine_type = ua.NodeId(15803, 0)
113
        raise NotImplementedError
114
115
class ProgramStateMachineTypeClass(FiniteStateMachineTypeClass):
116
    '''
117
    https://reference.opcfoundation.org/v104/Core/docs/Part10/4.2.3/
118
    '''
119
    def __init__(self, server=None, parent=None, idx=None, name=None):
120
        super().__init__(server, parent, idx, name)
121
        if name == None:
122
            name = "ProgramStateMachine"
123
        self._state_machine_type = ua.NodeId(2391, 0)
124
        self._ready_state = None #State node
125
        self._halted_state = None #State node
126
        self._running_state = None #State node
127
        self._suspended_state = None #State node
128
129
        self._halted_to_ready = None #Transition node
130
        self._ready_to_running = None #Transition node
131
        self._running_to_halted = None #Transition node
132
        self._running_to_ready = None #Transition node
133
        self._running_to_suspended = None #Transition node
134
        self._suspended_to_running = None #Transition node
135
        self._suspended_to_halted = None #Transition node
136
        self._suspended_to_ready = None #Transition node
137
        self._ready_to_halted = None #Transition node
138
139
        self._halt_method_node = None #uamethod node
140
        self._reset_method_node = None #uamethod node
141
        self._resume_method_node = None #uamethod node
142
        self._start_method_node = None #uamethod node
143
        self._suspend_method_node = None #uamethod node
144
145
    async def install(self, optionals=False):
146
        '''
147
        setup adressspace and initialize 
148
        '''
149
        self._optionals = optionals
150
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
151
        #get childnodes:
152
        self._ready_state = None #State node
153
        self._halted_state = None #State node
154
        self._running_state = None #State node
155
        self._suspended_state = None #State node
156
        self._halted_to_ready = None #Transition node
157
        self._ready_to_running = None #Transition node
158
        self._running_to_halted = None #Transition node
159
        self._running_to_ready = None #Transition node
160
        self._running_to_suspended = None #Transition node
161
        self._suspended_to_running = None #Transition node
162
        self._suspended_to_halted = None #Transition node
163
        self._suspended_to_ready = None #Transition node
164
        self._ready_to_halted = None #Transition node
165
        self._halt_method_node = None #uamethod node
166
        self._reset_method_node = None #uamethod node
167
        self._resume_method_node = None #uamethod node
168
        self._start_method_node = None #uamethod node
169
        self._suspend_method_node = None #uamethod node
170
171
    #Transition
172
    async def HaltedToReady(self):
173
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
174
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
175
        await self._last_transition.write_value(ua.LocalizedText("HaltedToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
176
        await self._last_transition_id.write_value(self._halted_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
177
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
178
179
    #Transition
180
    async def ReadyToRunning(self):
181
        await self._current_state.write_value(ua.LocalizedText("Running", "en-US"), varianttype=ua.VariantType.LocalizedText)
182
        await self._current_state_id.write_value(self._running_state.nodeid, varianttype=ua.VariantType.NodeId)
183
        await self._last_transition.write_value(ua.LocalizedText("ReadyToRunning", "en-US"), varianttype=ua.VariantType.LocalizedText)
184
        await self._last_transition_id.write_value(self._ready_to_running.nodeid, varianttype=ua.VariantType.NodeId)
185
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
186
187
    #Transition
188
    async def RunningToHalted(self):
189
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
190
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
191
        await self._last_transition.write_value(ua.LocalizedText("RunningToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
192
        await self._last_transition_id.write_value(self._running_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
193
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
194
195
    #Transition
196
    async def RunningToReady(self):
197
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
198
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
199
        await self._last_transition.write_value(ua.LocalizedText("RunningToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
200
        await self._last_transition_id.write_value(self._running_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
201
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
202
203
    #Transition
204
    async def RunningToSuspended(self):
205
        await self._current_state.write_value(ua.LocalizedText("Suspended", "en-US"), varianttype=ua.VariantType.LocalizedText)
206
        await self._current_state_id.write_value(self._suspended_state.nodeid, varianttype=ua.VariantType.NodeId)
207
        await self._last_transition.write_value(ua.LocalizedText("RunningToSuspended", "en-US"), varianttype=ua.VariantType.LocalizedText)
208
        await self._last_transition_id.write_value(self._running_to_suspended.nodeid, varianttype=ua.VariantType.NodeId)
209
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
210
211
    #Transition 
212
    async def SuspendedToRunning(self):
213
        await self._current_state.write_value(ua.LocalizedText("Running", "en-US"), varianttype=ua.VariantType.LocalizedText)
214
        await self._current_state_id.write_value(self._running_state.nodeid, varianttype=ua.VariantType.NodeId)
215
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToRunning", "en-US"), varianttype=ua.VariantType.LocalizedText)
216
        await self._last_transition_id.write_value(self._suspended_to_running.nodeid, varianttype=ua.VariantType.NodeId)
217
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
218
219
    #Transition
220
    async def SuspendedToHalted(self):
221
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
222
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
223
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
224
        await self._last_transition_id.write_value(self._suspended_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
225
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
226
227
    #Transition
228
    async def SuspendedToReady(self):
229
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
230
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
231
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
232
        await self._last_transition_id.write_value(self._suspended_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
233
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
234
235
    #Transition 
236
    async def ReadyToHalted(self):
237
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
238
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
239
        await self._last_transition.write_value(ua.LocalizedText("ReadyToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
240
        await self._last_transition_id.write_value(self._ready_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
241
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
242
243
    #method to be linked to uamethod
244
    async def Start(self):
245
        if await self._current_state.read_value() == ua.LocalizedText("Ready", "en-US"):
246
            return await ReadyToRunning()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ReadyToRunning does not seem to be defined.
Loading history...
247
        else:
248
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
249
250
    #method to be linked to uamethod
251
    async def Suspend(self):
252
        if await self._current_state.read_value() == ua.LocalizedText("Running", "en-US"):
253
            return await RunningToSuspended()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable RunningToSuspended does not seem to be defined.
Loading history...
254
        else:
255
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
256
257
    #method to be linked to uamethod
258
    async def Resume(self):
259
        if await self._current_state.read_value() == ua.LocalizedText("Suspended", "en-US"):
260
            return await SuspendedToRunning()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SuspendedToRunning does not seem to be defined.
Loading history...
261
        else:
262
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
263
264
    #method to be linked to uamethod
265
    async def Halt(self):
266
        if await self._current_state.read_value() == ua.LocalizedText("Ready", "en-US"):
267
            return await ReadyToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ReadyToHalted does not seem to be defined.
Loading history...
268
        elif await self._current_state.read_value() == ua.LocalizedText("Running", "en-US"):
269
            return await RunningToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable RunningToHalted does not seem to be defined.
Loading history...
270
        elif await self._current_state.read_value() == ua.LocalizedText("Suspended", "en-US"):
271
            return await SuspendedToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SuspendedToHalted does not seem to be defined.
Loading history...
272
        else:
273
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
274
275
    #method to be linked to uamethod
276
    async def Reset(self):
277
        if await self._current_state.read_value() == ua.LocalizedText("Halted", "en-US"):
278
            return await HaltedToReady()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable HaltedToReady does not seem to be defined.
Loading history...
279
        else:
280
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
281
282
class ShelvedStateMachineTypeClass(FiniteStateMachineTypeClass):
283
    '''
284
    NOT IMPLEMENTED "ShelvedStateMachineType"
285
    '''
286
    def __init__(self, server=None, parent=None, idx=None, name=None):
287
        super().__init__(server, parent)
288
        if name == None:
289
            name = "ShelvedStateMachine"
290
        self._state_machine_type = ua.NodeId(2929, 0)
291
        raise NotImplementedError
292
293
294
#Devtests
295
async def main():
296
    logging.basicConfig(level=logging.INFO)
297
    _logger = logging.getLogger('asyncua')
298
299
    server = Server()
300
    await server.init()
301
302
    sm = StateMachineTypeClass(server, server.nodes.objects, 0, "StateMachine")
303
    await sm.install(True)
304
    await sm.change_state(ua.LocalizedText("Test", "en-US"), server.get_node("ns=0;i=2253").nodeid)
305
    fsm = FiniteStateMachineTypeClass(server, server.nodes.objects, 0, "FiniteStateMachine")
306
    await fsm.install(True)
307
    pfsm = ProgramStateMachineTypeClass(server, server.nodes.objects, 0, "ProgramStateMachine")
308
    await pfsm.install(True)
309
310
    async with server:
311
        while 1:
312
            await asyncio.sleep(0)
313
314
if __name__ == "__main__":
315
    asyncio.run(main())
316