Passed
Pull Request — master (#348)
by
unknown
02:44
created

FiniteStateMachineTypeClass.install()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
import asyncio, logging
2
3
from asyncua import Server, ua, Node
4
from asyncua.common.instantiate_util import instantiate
5
6
class StateMachineTypeClass(object):
7
    '''
8
    Implementation of an StateMachineType
9
    '''
10
    def __init__(self, server=None, parent=None, idx=None, name=None):
11
        #if idx = none log parend idx is used
12
        self._server = server
13
        self._parent = parent
14
        self._state_machine_node = None
15
        self._state_machine_type = ua.NodeId(2299, 0)
16
        self._name = name
17
        self._idx = idx
18
19
        self._current_state = ua.LocalizedText() #Variable LocalizedText
20
        self._current_state_id = None #Property        
21
        self._last_transition = ua.LocalizedText() #Variable LocalizedText
22
        self._last_transition_id = None #Property
23
24
        self._optionals = False
25
26
    async def install(self, optionals=False):
27
        '''
28
        setup adressspace and initialize 
29
        '''
30
        self._optionals = optionals
31
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
32
    
33
    async def change_state(self, state_name, state, transition_name, transition=None):
34
        #check types: names = string and others are nodetype
35
        self._current_state = state #Variable
36
        self._current_state_id = None #Property  
37
        if self._optionals:
38
            self._last_transition = transition #Variable
39
            self._last_transition_id = None #Property
40
        #write
41
42
class FiniteStateMachineTypeClass(StateMachineTypeClass):
43
    '''
44
    Implementation of an FiniteStateMachineType
45
    '''
46
    def __init__(self, server=None, parent=None, idx=None, name=None):
47
        super().__init__(server, parent, idx, name)
48
        self._state_machine_type = ua.NodeId(2771, 0)
49
        self._avalible_states = []
50
        self._avalible_transitions = []
51
52
    async def install(self, optionals=False):
53
        '''
54
        setup adressspace and initialize 
55
        '''
56
        self._optionals = optionals
57
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
58
59
    async def set_avalible_states(self, states):
60
        self._avalible_states = states
61
62
    async def set_avalible_transitions(self, transitions):
63
        self._avalible_transitions = transitions      
64
65
class ExclusiveLimitStateMachineTypeClass(FiniteStateMachineTypeClass):
66
    def __init__(self, server=None, parent=None, idx=None, name=None):
67
        super().__init__(server, parent)
68
        self._state_machine_type = ua.NodeId(9318, 0)
69
        raise NotImplementedError
70
71
class FileTransferStateMachineTypeClass(FiniteStateMachineTypeClass):
72
    def __init__(self, server=None, parent=None, idx=None, name=None):
73
        super().__init__(server, parent)
74
        self._state_machine_type = ua.NodeId(15803, 0)
75
        raise NotImplementedError
76
77
class ProgramStateMachineTypeClass(FiniteStateMachineTypeClass):
78
    '''
79
    https://reference.opcfoundation.org/v104/Core/docs/Part10/4.2.3/
80
    '''
81
    def __init__(self, server=None, parent=None, idx=None, name=None):
82
        super().__init__(server, parent, idx, name)
83
        self._state_machine_type = ua.NodeId(2391, 0)
84
        self._ready_state = None #State node
85
        self._halted_state = None #State node
86
        self._running_state = None #State node
87
        self._suspended_state = None #State node
88
89
        self._halted_to_ready = None #Transition node
90
        self._ready_to_running = None #Transition node
91
        self._running_to_halted = None #Transition node
92
        self._running_to_ready = None #Transition node
93
        self._running_to_suspended = None #Transition node
94
        self._suspended_to_running = None #Transition node
95
        self._suspended_to_halted = None #Transition node
96
        self._suspended_to_ready = None #Transition node
97
        self._ready_to_halted = None #Transition node
98
99
        self._halt_method_node = None #uamethod node
100
        self._reset_method_node = None #uamethod node
101
        self._resume_method_node = None #uamethod node
102
        self._start_method_node = None #uamethod node
103
        self._suspend_method_node = None #uamethod node
104
105
    async def install(self, optionals=False):
106
        '''
107
        setup adressspace and initialize 
108
        '''
109
        self._optionals = optionals
110
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
111
        #get childnodes:
112
        self._ready_state = None #State node
113
        self._halted_state = None #State node
114
        self._running_state = None #State node
115
        self._suspended_state = None #State node
116
        self._halted_to_ready = None #Transition node
117
        self._ready_to_running = None #Transition node
118
        self._running_to_halted = None #Transition node
119
        self._running_to_ready = None #Transition node
120
        self._running_to_suspended = None #Transition node
121
        self._suspended_to_running = None #Transition node
122
        self._suspended_to_halted = None #Transition node
123
        self._suspended_to_ready = None #Transition node
124
        self._ready_to_halted = None #Transition node
125
        self._halt_method_node = None #uamethod node
126
        self._reset_method_node = None #uamethod node
127
        self._resume_method_node = None #uamethod node
128
        self._start_method_node = None #uamethod node
129
        self._suspend_method_node = None #uamethod node
130
131
    #Transition
132
    async def HaltedToReady(self):
133
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
134
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
135
        await self._last_transition.write_value(ua.LocalizedText("HaltedToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
136
        await self._last_transition_id.write_value(self._halted_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
137
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
138
139
    #Transition
140
    async def ReadyToRunning(self):
141
        await self._current_state.write_value(ua.LocalizedText("Running", "en-US"), varianttype=ua.VariantType.LocalizedText)
142
        await self._current_state_id.write_value(self._running_state.nodeid, varianttype=ua.VariantType.NodeId)
143
        await self._last_transition.write_value(ua.LocalizedText("ReadyToRunning", "en-US"), varianttype=ua.VariantType.LocalizedText)
144
        await self._last_transition_id.write_value(self._ready_to_running.nodeid, varianttype=ua.VariantType.NodeId)
145
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
146
147
    #Transition
148
    async def RunningToHalted(self):
149
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
150
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
151
        await self._last_transition.write_value(ua.LocalizedText("RunningToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
152
        await self._last_transition_id.write_value(self._running_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
153
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
154
155
    #Transition
156
    async def RunningToReady(self):
157
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
158
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
159
        await self._last_transition.write_value(ua.LocalizedText("RunningToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
160
        await self._last_transition_id.write_value(self._running_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
161
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
162
163
    #Transition
164
    async def RunningToSuspended(self):
165
        await self._current_state.write_value(ua.LocalizedText("Suspended", "en-US"), varianttype=ua.VariantType.LocalizedText)
166
        await self._current_state_id.write_value(self._suspended_state.nodeid, varianttype=ua.VariantType.NodeId)
167
        await self._last_transition.write_value(ua.LocalizedText("RunningToSuspended", "en-US"), varianttype=ua.VariantType.LocalizedText)
168
        await self._last_transition_id.write_value(self._running_to_suspended.nodeid, varianttype=ua.VariantType.NodeId)
169
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
170
171
    #Transition 
172
    async def SuspendedToRunning(self):
173
        await self._current_state.write_value(ua.LocalizedText("Running", "en-US"), varianttype=ua.VariantType.LocalizedText)
174
        await self._current_state_id.write_value(self._running_state.nodeid, varianttype=ua.VariantType.NodeId)
175
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToRunning", "en-US"), varianttype=ua.VariantType.LocalizedText)
176
        await self._last_transition_id.write_value(self._suspended_to_running.nodeid, varianttype=ua.VariantType.NodeId)
177
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
178
179
    #Transition
180
    async def SuspendedToHalted(self):
181
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
182
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
183
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
184
        await self._last_transition_id.write_value(self._suspended_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
185
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
186
187
    #Transition
188
    async def SuspendedToReady(self):
189
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
190
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
191
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
192
        await self._last_transition_id.write_value(self._suspended_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
193
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
194
195
    #Transition 
196
    async def ReadyToHalted(self):
197
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
198
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
199
        await self._last_transition.write_value(ua.LocalizedText("ReadyToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
200
        await self._last_transition_id.write_value(self._ready_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
201
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
202
203
    #method to be linked to uamethod
204
    async def Start(self):
205
        if await self._current_state.read_value() == ua.LocalizedText("Ready", "en-US"):
206
            return await ReadyToRunning()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ReadyToRunning does not seem to be defined.
Loading history...
207
        else:
208
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
209
210
    #method to be linked to uamethod
211
    async def Suspend(self):
212
        if await self._current_state.read_value() == ua.LocalizedText("Running", "en-US"):
213
            return await RunningToSuspended()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable RunningToSuspended does not seem to be defined.
Loading history...
214
        else:
215
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
216
217
    #method to be linked to uamethod
218
    async def Resume(self):
219
        if await self._current_state.read_value() == ua.LocalizedText("Suspended", "en-US"):
220
            return await SuspendedToRunning()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SuspendedToRunning does not seem to be defined.
Loading history...
221
        else:
222
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
223
224
    #method to be linked to uamethod
225
    async def Halt(self):
226
        if await self._current_state.read_value() == ua.LocalizedText("Ready", "en-US"):
227
            return await ReadyToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ReadyToHalted does not seem to be defined.
Loading history...
228
        elif await self._current_state.read_value() == ua.LocalizedText("Running", "en-US"):
229
            return await RunningToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable RunningToHalted does not seem to be defined.
Loading history...
230
        elif await self._current_state.read_value() == ua.LocalizedText("Suspended", "en-US"):
231
            return await SuspendedToHalted()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SuspendedToHalted does not seem to be defined.
Loading history...
232
        else:
233
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
234
235
    #method to be linked to uamethod
236
    async def Reset(self):
237
        if await self._current_state.read_value() == ua.LocalizedText("Halted", "en-US"):
238
            return await HaltedToReady()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable HaltedToReady does not seem to be defined.
Loading history...
239
        else:
240
            return ua.StatusCode(ua.status_codes.StatusCodes.BadNotExecutable)
241
242
class ShelvedStateMachineTypeClass(FiniteStateMachineTypeClass):
243
    def __init__(self, server=None, parent=None, idx=None, name=None):
244
        super().__init__(server, parent)
245
        self._state_machine_type = ua.NodeId(2929, 0)
246
        raise NotImplementedError
247
248
249
250
251
252
#Devtests
253
254
async def main():
255
    logging.basicConfig(level=logging.INFO)
256
    _logger = logging.getLogger('asyncua')
257
258
    server = Server()
259
    await server.init()
260
261
    sm = StateMachineTypeClass(server, server.nodes.objects, 0, "StateMachine")
262
    await sm.install(True)
263
    fsm = FiniteStateMachineTypeClass(server, server.nodes.objects, 0, "FiniteStateMachine")
264
    await fsm.install(True)
265
    pfsm = ProgramStateMachineTypeClass(server, server.nodes.objects, 0, "ProgramStateMachine")
266
    await pfsm.install(False)
267
268
    async with server:
269
        while 1:
270
            await asyncio.sleep(0)
271
272
if __name__ == "__main__":
273
    asyncio.run(main())
274