Passed
Pull Request — master (#348)
by
unknown
02:42
created

asyncua.common.statemachine.main()   A

Complexity

Conditions 3

Size

Total Lines 17
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 14
nop 0
dl 0
loc 17
rs 9.7
c 0
b 0
f 0
1
import asyncio, logging
2
3
from asyncua import Server, ua, Node
4
from asyncua.common.instantiate_util import instantiate
5
6
class StateMachineTypeClass(object):
7
    '''
8
    Implementation of an StateMachineType
9
    '''
10
    def __init__(self, server=None, parent=None, idx=None, name=None):
11
        if not server: raise ValueError #change to check instance type
12
        if not parent: raise ValueError #change to check instance type
13
        if idx == None:
14
            idx = parent.nodeid.NamespaceIndex
15
        if name == None:
16
            name = "StateMachine"
17
        self._server = server
18
        self._parent = parent
19
        self._state_machine_node = None
20
        self._state_machine_type = ua.NodeId(2299, 0)
21
        self._name = name
22
        self._idx = idx
23
        self._current_state = ua.LocalizedText() #Variable LocalizedText
24
        self._current_state_id = None #Property NodeId        
25
        self._last_transition = ua.LocalizedText() #Variable LocalizedText
26
        self._last_transition_id = None #Property NodeId   
27
        self._optionals = False
28
29
    async def install(self, optionals=False):
30
        '''
31
        setup adressspace and initialize 
32
        '''
33
        self._optionals = optionals
34
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
35
    
36
    async def change_state(self, state_name, state, transition_name, transition=None):
37
        #check types: names = string and others are nodetype
38
        self._current_state = state #Variable LocalizedText
39
        self._current_state_id = None #Property NodeId   
40
        if self._optionals:
41
            self._last_transition = transition #Variable LocalizedText
42
            self._last_transition_id = None #Property NodeId   
43
        #write
44
45
class FiniteStateMachineTypeClass(StateMachineTypeClass):
46
    '''
47
    Implementation of an FiniteStateMachineType
48
    '''
49
    def __init__(self, server=None, parent=None, idx=None, name=None):
50
        super().__init__(server, parent, idx, name)
51
        if name == None:
52
            name = "FiniteStateMachine"
53
        self._state_machine_type = ua.NodeId(2771, 0)
54
        self._avalible_states = []
55
        self._avalible_transitions = []
56
57
    async def install(self, optionals=False):
58
        '''
59
        setup adressspace and initialize 
60
        '''
61
        self._optionals = optionals
62
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
63
64
    async def set_avalible_states(self, states):
65
        self._avalible_states = states
66
67
    async def set_avalible_transitions(self, transitions):
68
        self._avalible_transitions = transitions      
69
70
class ExclusiveLimitStateMachineTypeClass(FiniteStateMachineTypeClass):
71
    '''
72
    NOT IMPLEMENTED "ExclusiveLimitStateMachineType"
73
    '''
74
    def __init__(self, server=None, parent=None, idx=None, name=None):
75
        super().__init__(server, parent)
76
        if name == None:
77
            name = "ExclusiveLimitStateMachine"
78
        self._state_machine_type = ua.NodeId(9318, 0)
79
        raise NotImplementedError
80
81
class FileTransferStateMachineTypeClass(FiniteStateMachineTypeClass):
82
    '''
83
    NOT IMPLEMENTED "FileTransferStateMachineType"
84
    '''
85
    def __init__(self, server=None, parent=None, idx=None, name=None):
86
        super().__init__(server, parent)
87
        if name == None:
88
            name = "FileTransferStateMachine"
89
        self._state_machine_type = ua.NodeId(15803, 0)
90
        raise NotImplementedError
91
92
class ProgramStateMachineTypeClass(FiniteStateMachineTypeClass):
93
    '''
94
    https://reference.opcfoundation.org/v104/Core/docs/Part10/4.2.3/
95
    '''
96
    def __init__(self, server=None, parent=None, idx=None, name=None):
97
        super().__init__(server, parent, idx, name)
98
        if name == None:
99
            name = "ProgramStateMachine"
100
        self._state_machine_type = ua.NodeId(2391, 0)
101
        self._ready_state = None #State node
102
        self._halted_state = None #State node
103
        self._running_state = None #State node
104
        self._suspended_state = None #State node
105
106
        self._halted_to_ready = None #Transition node
107
        self._ready_to_running = None #Transition node
108
        self._running_to_halted = None #Transition node
109
        self._running_to_ready = None #Transition node
110
        self._running_to_suspended = None #Transition node
111
        self._suspended_to_running = None #Transition node
112
        self._suspended_to_halted = None #Transition node
113
        self._suspended_to_ready = None #Transition node
114
        self._ready_to_halted = None #Transition node
115
116
        self._halt_method_node = None #uamethod node
117
        self._reset_method_node = None #uamethod node
118
        self._resume_method_node = None #uamethod node
119
        self._start_method_node = None #uamethod node
120
        self._suspend_method_node = None #uamethod node
121
122
    async def install(self, optionals=False):
123
        '''
124
        setup adressspace and initialize 
125
        '''
126
        self._optionals = optionals
127
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
128
        #get childnodes:
129
        self._ready_state = None #State node
130
        self._halted_state = None #State node
131
        self._running_state = None #State node
132
        self._suspended_state = None #State node
133
        self._halted_to_ready = None #Transition node
134
        self._ready_to_running = None #Transition node
135
        self._running_to_halted = None #Transition node
136
        self._running_to_ready = None #Transition node
137
        self._running_to_suspended = None #Transition node
138
        self._suspended_to_running = None #Transition node
139
        self._suspended_to_halted = None #Transition node
140
        self._suspended_to_ready = None #Transition node
141
        self._ready_to_halted = None #Transition node
142
        self._halt_method_node = None #uamethod node
143
        self._reset_method_node = None #uamethod node
144
        self._resume_method_node = None #uamethod node
145
        self._start_method_node = None #uamethod node
146
        self._suspend_method_node = None #uamethod node
147
148
    #Transition
149
    async def HaltedToReady(self):
150
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
151
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
152
        await self._last_transition.write_value(ua.LocalizedText("HaltedToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
153
        await self._last_transition_id.write_value(self._halted_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
154
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
155
156
    #Transition
157
    async def ReadyToRunning(self):
158
        await self._current_state.write_value(ua.LocalizedText("Running", "en-US"), varianttype=ua.VariantType.LocalizedText)
159
        await self._current_state_id.write_value(self._running_state.nodeid, varianttype=ua.VariantType.NodeId)
160
        await self._last_transition.write_value(ua.LocalizedText("ReadyToRunning", "en-US"), varianttype=ua.VariantType.LocalizedText)
161
        await self._last_transition_id.write_value(self._ready_to_running.nodeid, varianttype=ua.VariantType.NodeId)
162
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
163
164
    #Transition
165
    async def RunningToHalted(self):
166
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
167
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
168
        await self._last_transition.write_value(ua.LocalizedText("RunningToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
169
        await self._last_transition_id.write_value(self._running_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
170
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
171
172
    #Transition
173
    async def RunningToReady(self):
174
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
175
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
176
        await self._last_transition.write_value(ua.LocalizedText("RunningToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
177
        await self._last_transition_id.write_value(self._running_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
178
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
179
180
    #Transition
181
    async def RunningToSuspended(self):
182
        await self._current_state.write_value(ua.LocalizedText("Suspended", "en-US"), varianttype=ua.VariantType.LocalizedText)
183
        await self._current_state_id.write_value(self._suspended_state.nodeid, varianttype=ua.VariantType.NodeId)
184
        await self._last_transition.write_value(ua.LocalizedText("RunningToSuspended", "en-US"), varianttype=ua.VariantType.LocalizedText)
185
        await self._last_transition_id.write_value(self._running_to_suspended.nodeid, varianttype=ua.VariantType.NodeId)
186
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
187
188
    #Transition 
189
    async def SuspendedToRunning(self):
190
        await self._current_state.write_value(ua.LocalizedText("Running", "en-US"), varianttype=ua.VariantType.LocalizedText)
191
        await self._current_state_id.write_value(self._running_state.nodeid, varianttype=ua.VariantType.NodeId)
192
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToRunning", "en-US"), varianttype=ua.VariantType.LocalizedText)
193
        await self._last_transition_id.write_value(self._suspended_to_running.nodeid, varianttype=ua.VariantType.NodeId)
194
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
195
196
    #Transition
197
    async def SuspendedToHalted(self):
198
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
199
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
200
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
201
        await self._last_transition_id.write_value(self._suspended_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
202
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
203
204
    #Transition
205
    async def SuspendedToReady(self):
206
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
207
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
208
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
209
        await self._last_transition_id.write_value(self._suspended_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
210
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
211
212
    #Transition 
213
    async def ReadyToHalted(self):
214
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
215
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
216
        await self._last_transition.write_value(ua.LocalizedText("ReadyToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
217
        await self._last_transition_id.write_value(self._ready_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
218
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
219
220
    #method to be linked to uamethod
221
    async def Start(self):
222
        if await self._current_state.read_value() == ua.LocalizedText("Ready", "en-US"):
223
            return await ReadyToRunning()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ReadyToRunning does not seem to be defined.
Loading history...
224
        else:
225
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
226
227
    #method to be linked to uamethod
228
    async def Suspend(self):
229
        if await self._current_state.read_value() == ua.LocalizedText("Running", "en-US"):
230
            return await RunningToSuspended()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable RunningToSuspended does not seem to be defined.
Loading history...
231
        else:
232
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
233
234
    #method to be linked to uamethod
235
    async def Resume(self):
236
        if await self._current_state.read_value() == ua.LocalizedText("Suspended", "en-US"):
237
            return await SuspendedToRunning()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SuspendedToRunning does not seem to be defined.
Loading history...
238
        else:
239
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
240
241
    #method to be linked to uamethod
242
    async def Halt(self):
243
        if await self._current_state.read_value() == ua.LocalizedText("Ready", "en-US"):
244
            return await ReadyToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ReadyToHalted does not seem to be defined.
Loading history...
245
        elif await self._current_state.read_value() == ua.LocalizedText("Running", "en-US"):
246
            return await RunningToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable RunningToHalted does not seem to be defined.
Loading history...
247
        elif await self._current_state.read_value() == ua.LocalizedText("Suspended", "en-US"):
248
            return await SuspendedToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SuspendedToHalted does not seem to be defined.
Loading history...
249
        else:
250
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
251
252
    #method to be linked to uamethod
253
    async def Reset(self):
254
        if await self._current_state.read_value() == ua.LocalizedText("Halted", "en-US"):
255
            return await HaltedToReady()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable HaltedToReady does not seem to be defined.
Loading history...
256
        else:
257
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
258
259
class ShelvedStateMachineTypeClass(FiniteStateMachineTypeClass):
260
    '''
261
    NOT IMPLEMENTED "ShelvedStateMachineType"
262
    '''
263
    def __init__(self, server=None, parent=None, idx=None, name=None):
264
        super().__init__(server, parent)
265
        if name == None:
266
            name = "ShelvedStateMachine"
267
        self._state_machine_type = ua.NodeId(2929, 0)
268
        raise NotImplementedError
269
270
271
#Devtests
272
async def main():
273
    logging.basicConfig(level=logging.INFO)
274
    _logger = logging.getLogger('asyncua')
275
276
    server = Server()
277
    await server.init()
278
279
    sm = StateMachineTypeClass(server, server.nodes.objects, 0, "StateMachine")
280
    await sm.install(True)
281
    fsm = FiniteStateMachineTypeClass(server, server.nodes.objects, 0, "FiniteStateMachine")
282
    await fsm.install(True)
283
    pfsm = ProgramStateMachineTypeClass(server, server.nodes.objects, 0, "ProgramStateMachine")
284
    await pfsm.install(True)
285
286
    async with server:
287
        while 1:
288
            await asyncio.sleep(0)
289
290
if __name__ == "__main__":
291
    asyncio.run(main())
292