Passed
Pull Request — master (#348)
by
unknown
02:43
created

asyncua.common.statemachine   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 218
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 154
dl 0
loc 218
rs 10
c 0
b 0
f 0
wmc 25

21 Methods

Rating   Name   Duplication   Size   Complexity  
A ProgramStateMachineTypeClass.install() 0 19 1
A FiniteStateMachineTypeClass.__init__() 0 5 1
A FiniteStateMachineTypeClass.set_avalible_states() 0 2 1
A ProgramStateMachineTypeClass.SuspendedToRunning() 0 6 1
A ShelvedStateMachineTypeClass.__init__() 0 4 1
A StateMachineTypeClass.__init__() 0 15 1
A ProgramStateMachineTypeClass.ReadyToRunning() 0 6 1
A ProgramStateMachineTypeClass.SuspendedToHalted() 0 6 1
A StateMachineTypeClass.change_state() 0 7 2
A ExclusiveLimitStateMachineTypeClass.__init__() 0 4 1
A ProgramStateMachineTypeClass.RunningToHalted() 0 6 1
A ProgramStateMachineTypeClass.HaltedToReady() 0 6 1
A ProgramStateMachineTypeClass.__init__() 0 17 1
A ProgramStateMachineTypeClass.SuspendedToReady() 0 6 1
A StateMachineTypeClass.install() 0 6 1
A ProgramStateMachineTypeClass.ReadyToHalted() 0 6 1
A ProgramStateMachineTypeClass.RunningToSuspended() 0 6 1
A FiniteStateMachineTypeClass.set_avalible_transitions() 0 2 1
A ProgramStateMachineTypeClass.RunningToReady() 0 6 1
A FileTransferStateMachineTypeClass.__init__() 0 4 1
A FiniteStateMachineTypeClass.install() 0 6 1

1 Function

Rating   Name   Duplication   Size   Complexity  
A main() 0 15 3
1
import asyncio, logging
2
3
from asyncua import Server, ua, Node
4
from asyncua.common.instantiate_util import instantiate
5
6
class StateMachineTypeClass(object):
7
    '''
8
    Implementation of an StateMachineType
9
    '''
10
    def __init__(self, server=None, parent=None, idx=None, name=None):
11
        #if idx = none log parend idx is used
12
        self._server = server
13
        self._parent = parent
14
        self._state_machine_node = None
15
        self._state_machine_type = ua.NodeId(2299, 0)
16
        self._name = name
17
        self._idx = idx
18
19
        self._current_state = "None" #Variable
20
        self._current_state_id = None #Property        
21
        self._last_transition = "None" #Variable
22
        self._last_transition_id = None #Property
23
24
        self._optionals = False
25
26
    async def install(self, optionals=False):
27
        '''
28
        setup adressspace and initialize 
29
        '''
30
        self._optionals = optionals
31
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=optionals)
32
    
33
    async def change_state(self, state_name, state, transition_name, transition=None):
34
        #check types: names = string and others are nodetype
35
        self._current_state = state #Variable
36
        self._current_state_id = None #Property  
37
        if self._optionals:
38
            self._last_transition = transition #Variable
39
            self._last_transition_id = None #Property
40
        #write
41
42
class FiniteStateMachineTypeClass(StateMachineTypeClass):
43
    '''
44
    Implementation of an FiniteStateMachineType
45
    '''
46
    def __init__(self, server=None, parent=None, idx=None, name=None):
47
        super().__init__(server, parent, idx, name)
48
        self._state_machine_type = ua.NodeId(2771, 0)
49
        self._avalible_states = []
50
        self._avalible_transitions = []
51
52
    async def install(self, instantiate_optional=False):
53
        '''
54
        setup adressspace and initialize 
55
        '''
56
        self._optionals = instantiate_optional
57
        self._state_machine_node = await self._parent.add_object(self._idx, self._name, objecttype=self._state_machine_type, instantiate_optional=instantiate_optional)
58
59
    async def set_avalible_states(self, states):
60
        self._avalible_states = states
61
62
    async def set_avalible_transitions(self, transitions):
63
        self._avalible_transitions = transitions      
64
65
class ExclusiveLimitStateMachineTypeClass(FiniteStateMachineTypeClass):
66
    def __init__(self, server=None, parent=None, idx=None, name=None):
67
        super().__init__(server, parent)
68
        self._state_machine_type = ua.ObjectIds.ExclusiveLimitStateMachineType
69
        raise NotImplementedError
70
71
class FileTransferStateMachineTypeClass(FiniteStateMachineTypeClass):
72
    def __init__(self, server=None, parent=None, idx=None, name=None):
73
        super().__init__(server, parent)
74
        self._state_machine_type = ua.ObjectIds.FileTransferStateMachineType
75
        raise NotImplementedError
76
77
class ProgramStateMachineTypeClass(FiniteStateMachineTypeClass):
78
    def __init__(self, server=None, parent=None, idx=None, name=None):
79
        super().__init__(server, parent)
80
        self._state_machine_type = ua.ObjectIds.ProgramStateMachineType
81
        self._ready_state = None #State node
82
        self._halted_state = None #State node
83
        self._running_state = None #State node
84
        self._suspended_state = None #State node
85
86
        self._halted_to_ready = None #Transition node
87
        self._ready_to_running = None #Transition node
88
        self._running_to_halted = None #Transition node
89
        self._running_to_ready = None #Transition node
90
        self._running_to_suspended = None #Transition node
91
        self._suspended_to_running = None #Transition node
92
        self._suspended_to_halted = None #Transition node
93
        self._suspended_to_ready = None #Transition node
94
        self._ready_to_halted = None #Transition node
95
96
    async def install(self):
97
        #setup addressspace
98
        # instantiate(parent=self._parent, node_type=self._state_machine_type, nodeid=, bname="StateMachine", dname=, idx= , instantiate_optionals=False)
99
        #maybe better to use create_object()
100
        self._state_machine_node = None
101
        #get childnodes:
102
        self._ready_state = None #State node
103
        self._halted_state = None #State node
104
        self._running_state = None #State node
105
        self._suspended_state = None #State node
106
        self._halted_to_ready = None #Transition node
107
        self._ready_to_running = None #Transition node
108
        self._running_to_halted = None #Transition node
109
        self._running_to_ready = None #Transition node
110
        self._running_to_suspended = None #Transition node
111
        self._suspended_to_running = None #Transition node
112
        self._suspended_to_halted = None #Transition node
113
        self._suspended_to_ready = None #Transition node
114
        self._ready_to_halted = None #Transition node
115
116
    #Transition
117
    async def HaltedToReady(self):
118
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
119
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
120
        await self._last_transition.write_value(ua.LocalizedText("HaltedToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
121
        await self._last_transition_id.write_value(self._halted_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
122
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
123
124
    #Transition
125
    async def ReadyToRunning(self):
126
        await self._current_state.write_value(ua.LocalizedText("Running", "en-US"), varianttype=ua.VariantType.LocalizedText)
127
        await self._current_state_id.write_value(self._running_state.nodeid, varianttype=ua.VariantType.NodeId)
128
        await self._last_transition.write_value(ua.LocalizedText("ReadyToRunning", "en-US"), varianttype=ua.VariantType.LocalizedText)
129
        await self._last_transition_id.write_value(self._ready_to_running.nodeid, varianttype=ua.VariantType.NodeId)
130
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
131
132
    #Transition
133
    async def RunningToHalted(self):
134
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
135
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
136
        await self._last_transition.write_value(ua.LocalizedText("RunningToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
137
        await self._last_transition_id.write_value(self._running_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
138
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
139
140
    #Transition
141
    async def RunningToReady(self):
142
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
143
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
144
        await self._last_transition.write_value(ua.LocalizedText("RunningToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
145
        await self._last_transition_id.write_value(self._running_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
146
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
147
148
    #Transition
149
    async def RunningToSuspended(self):
150
        await self._current_state.write_value(ua.LocalizedText("Suspended", "en-US"), varianttype=ua.VariantType.LocalizedText)
151
        await self._current_state_id.write_value(self._suspended_state.nodeid, varianttype=ua.VariantType.NodeId)
152
        await self._last_transition.write_value(ua.LocalizedText("RunningToSuspended", "en-US"), varianttype=ua.VariantType.LocalizedText)
153
        await self._last_transition_id.write_value(self._running_to_suspended.nodeid, varianttype=ua.VariantType.NodeId)
154
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
155
156
    #Transition 
157
    async def SuspendedToRunning(self):
158
        await self._current_state.write_value(ua.LocalizedText("Running", "en-US"), varianttype=ua.VariantType.LocalizedText)
159
        await self._current_state_id.write_value(self._running_state.nodeid, varianttype=ua.VariantType.NodeId)
160
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToRunning", "en-US"), varianttype=ua.VariantType.LocalizedText)
161
        await self._last_transition_id.write_value(self._suspended_to_running.nodeid, varianttype=ua.VariantType.NodeId)
162
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
163
164
    #Transition
165
    async def SuspendedToHalted(self):
166
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
167
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
168
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
169
        await self._last_transition_id.write_value(self._suspended_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
170
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
171
172
    #Transition
173
    async def SuspendedToReady(self):
174
        await self._current_state.write_value(ua.LocalizedText("Ready", "en-US"), varianttype=ua.VariantType.LocalizedText)
175
        await self._current_state_id.write_value(self._ready_state.nodeid, varianttype=ua.VariantType.NodeId)
176
        await self._last_transition.write_value(ua.LocalizedText("SuspendedToReady", "en-US"), varianttype=ua.VariantType.LocalizedText)
177
        await self._last_transition_id.write_value(self._suspended_to_ready.nodeid, varianttype=ua.VariantType.NodeId)
178
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
179
180
    #Transition 
181
    async def ReadyToHalted(self):
182
        await self._current_state.write_value(ua.LocalizedText("Halted", "en-US"), varianttype=ua.VariantType.LocalizedText)
183
        await self._current_state_id.write_value(self._halted_state.nodeid, varianttype=ua.VariantType.NodeId)
184
        await self._last_transition.write_value(ua.LocalizedText("ReadyToHalted", "en-US"), varianttype=ua.VariantType.LocalizedText)
185
        await self._last_transition_id.write_value(self._ready_to_halted.nodeid, varianttype=ua.VariantType.NodeId)
186
        return ua.StatusCode(ua.status_codes.StatusCodes.Good)
187
188
class ShelvedStateMachineTypeClass(FiniteStateMachineTypeClass):
189
    def __init__(self, server=None, parent=None, idx=None, name=None):
190
        super().__init__(server, parent)
191
        self._state_machine_type = ua.ObjectIds.ShelvedStateMachineType
192
        raise NotImplementedError
193
194
195
196
197
198
#Devtests
199
200
async def main():
201
    logging.basicConfig(level=logging.INFO)
202
    _logger = logging.getLogger('asyncua')
203
204
    server = Server()
205
206
    await server.init()
207
    sm = StateMachineTypeClass(server, server.get_objects_node(), 0, "Test1")
208
    await sm.install()
209
    fsm = FiniteStateMachineTypeClass(server, server.nodes.objects, 0, "Test2")
210
    await fsm.install()
211
212
    async with server:
213
        while 1:
214
            await asyncio.sleep(0)
215
216
if __name__ == "__main__":
217
    asyncio.run(main())
218