Completed
Push — dev ( 72aa02...df6b30 )
by Olivier
06:59
created

opcua._lsprint()   F

Complexity

Conditions 10

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 32
rs 3.1304
cc 10

How to fix   Complexity   

Complexity

Complex classes like opcua._lsprint() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
import sys
3
import argparse
4
from opcua import ua, Client
5
import code
6
from enum import Enum
7
8
9
def add_minimum_args(parser):
10
    parser.add_argument("-u",
11
                        "--url",
12
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
13
                        default='opc.tcp://localhost:4841',
14
                        metavar="URL")
15
    parser.add_argument("-v",
16
                        "--verbose",
17
                        dest="loglevel",
18
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
19
                        default='WARNING',
20
                        help="Set log level")
21
    parser.add_argument("--timeout",
22
                        dest="timeout",
23
                        type=int,
24
                        default=1,
25
                        help="Set socket timeout (NOT the diverse UA timeouts)")
26
27
28
def add_common_args(parser):
29
    add_minimum_args(parser)
30
    parser.add_argument("-n",
31
                        "--nodeid",
32
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
33
                        default='i=84',
34
                        metavar="NODE")
35
    parser.add_argument("-p",
36
                        "--path",
37
                        help="Comma separated browse path to the node starting at nodeid (for example: 3:Mybject,3:MyVariable)",
38
                        default='',
39
                        metavar="BROWSEPATH")
40
    parser.add_argument("-i",
41
                        "--namespace",
42
                        help="Default namespace",
43
                        type=int,
44
                        default=0,
45
                        metavar="NAMESPACE")
46
47
48
def get_node(client, args):
49
    node = client.get_node(args.nodeid)
50
    if args.path:
51
        node = node.get_child(args.path.split(","))
52
53
54
def uaread():
55
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
56
    add_common_args(parser)
57
    parser.add_argument("-a",
58
                        "--attribute",
59
                        dest="attribute",
60
                        type=int,
61
                        #default="VALUE",
62
                        #choices=['VALUE', 'NODEID', 'BROWSENAME', 'ERROR', 'CRITICAL'],
63
                        default=ua.AttributeIds.Value,
64
                        help="Set attribute to read")
65
    parser.add_argument("-t",
66
                        "--datatype",
67
                        dest="datatype",
68
                        default="python",
69
                        choices=['python', 'variant', 'datavalue'],
70
                        help="Data type to return")
71
72
    args = parser.parse_args()
73
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
74
        parser.print_usage()
75
        print("uaread: error: A NodeId or BrowsePath is required")
76
        sys.exit(1)
77
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
78
79
    client = Client(args.url, timeout=args.timeout)
80
    client.connect()
81
    try:
82
        node = client.get_node(args.nodeid)
83
        if args.path:
84
            node = node.get_child(args.path.split(","))
85
        attr = node.get_attribute(args.attribute)
86
        if args.datatype == "python":
87
            print(attr.Value.Value)
88
        elif args.datatype == "variant":
89
            print(attr.Value)
90
        else:
91
            print(attr)
92
    finally:
93
        client.disconnect()
94
    sys.exit(0)
95
    print(args)
96
97
98
def _args_to_array(val, array):
99
    if array == "guess":
100
        if "," in val:
101
            array = "true"
102
    if array == "true":
103
        val = val.split(",")
104
    return val
105
106
107
def _arg_to_bool(val):
108
    if val in ("true", "True"):
109
        return True
110
    else:
111
        return False
112
113
114
def _arg_to_variant(val, array, ptype, varianttype=None):
115
    val = _args_to_array(val, array)
116
    if isinstance(val, list):
117
        val = [ptype(i) for i in val]
118
    else:
119
        val = ptype(val)
120
    if varianttype:
121
        return ua.Variant(val, varianttype)
122
    else:
123
        return ua.Variant(val)
124
125
126
def _val_to_variant(val, args):
127
    array = args.array
128
    if args.datatype == "guess":
129
        if val in ("true", "True", "false", "False"):
130
            return _arg_to_variant(val, array, _arg_to_bool)
131
        # FIXME: guess bool value
132
        try:
133
            return _arg_to_variant(val, array, int)
134
        except ValueError:
135
            try:
136
                return _arg_to_variant(val, array, float)
137
            except ValueError:
138
                return _arg_to_variant(val, array, str)
139
    elif args.datatype == "bool":
140
        if val in ("1", "True", "true"):
141
            return ua.Variant(True, ua.VariantType.Boolean)
142
        else:
143
            return ua.Variant(False, ua.VariantType.Boolean)
144
    elif args.datatype == "sbyte":
145
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
146
    elif args.datatype == "byte":
147
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
148
    #elif args.datatype == "uint8":
149
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
150
    elif args.datatype == "uint16":
151
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
152
    elif args.datatype == "uint32":
153
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
154
    elif args.datatype == "uint64":
155
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
156
    #elif args.datatype == "int8":
157
        #return ua.Variant(int(val), ua.VariantType.Int8)
158
    elif args.datatype == "int16":
159
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
160
    elif args.datatype == "int32":
161
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
162
    elif args.datatype == "int64":
163
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
164
    elif args.datatype == "float":
165
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
166
    elif args.datatype == "double":
167
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
168
    elif args.datatype == "string":
169
        return _arg_to_variant(val, array, str, ua.VariantType.String)
170
    elif args.datatype == "datetime":
171
        raise NotImplementedError
172
    elif args.datatype == "Guid":
173
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
174
    elif args.datatype == "ByteString":
175
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
176
    elif args.datatype == "xml":
177
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
178
    elif args.datatype == "nodeid":
179
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
180
    elif args.datatype == "expandednodeid":
181
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
182
    elif args.datatype == "statuscode":
183
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
184
    elif args.datatype in ("qualifiedname", "browsename"):
185
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
186
    elif args.datatype == "LocalizedText":
187
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantTypeLocalizedText)
188
189
190
def uawrite():
191
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
192
    add_common_args(parser)
193
    parser.add_argument("-a",
194
                        "--attribute",
195
                        dest="attribute",
196
                        type=int,
197
                        #default="VALUE",
198
                        #choices=['VALUE', 'NODEID', 'BROWSENAME', 'ERROR', 'CRITICAL'],
199
                        default=ua.AttributeIds.Value,
200
                        help="Set attribute to read")
201
    parser.add_argument("-l",
202
                        "--list",
203
                        "--array",
204
                        dest="array",
205
                        default="guess",
206
                        choices=["guess", "true", "false"],
207
                        help="Value is an array")
208
    parser.add_argument("-t",
209
                        "--datatype",
210
                        dest="datatype",
211
                        default="guess",
212
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],  
213
                        help="Data type to return")
214
    parser.add_argument("value",
215
                        help="Value to be written",
216
                        metavar="VALUE")
217
    args = parser.parse_args()
218
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
219
        parser.print_usage()
220
        print("uaread: error: A NodeId or BrowsePath is required")
221
        sys.exit(1)
222
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
223
224
    client = Client(args.url, timeout=args.timeout)
225
    client.connect()
226
    try:
227
        node = client.get_node(args.nodeid)
228
        if args.path:
229
            node = node.get_child(args.path.split(","))
230
        val = _val_to_variant(args.value, args)
231
        node.set_attribute(args.attribute, ua.DataValue(val))
232
    finally:
233
        client.disconnect()
234
    sys.exit(0)
235
    print(args)
236
237
238
def uals():
239
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
240
    add_common_args(parser)
241
    parser.add_argument("-l",
242
                        dest="print_format",
243
                        #action="store_true",
244
                        default=1,
245
                        nargs="?",
246
                        type=int,
247
                        help="use a long listing format")
248
    parser.add_argument("-d",
249
                        "--depth",
250
                        default=1,
251
                        type=int,
252
                        help="Browse depth")
253
254
    args = parser.parse_args()
255
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
256
257
    client = Client(args.url, timeout=args.timeout)
258
    client.connect()
259
    try:
260
        node = client.get_node(args.nodeid)
261
        if args.path:
262
            node = node.get_child(args.path.split(","))
263
        print("Browsing node {} at {}\n".format(node, args.url))
264
        if args.print_format == 0:
265
            print("{:30} {:25}".format("DisplayName", "NodeId"))
266
            print("")
267
        elif args.print_format == 1:
268
            print("{:30} {:25} {:25} {:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
269
            print("")
270
        _lsprint(client, node.nodeid, args.print_format, args.depth - 1)
271
272
    finally:
273
        client.disconnect()
274
    sys.exit(0)
275
    print(args)
276
277
278
def _lsprint(client, nodeid, print_format, depth, indent=""):
279
    indent += "  "
280
    pnode = client.get_node(nodeid)
281
    if print_format in (0, 1):
282
        for desc in pnode.get_children_descriptions():
283
            if print_format == 0:
284
                print("{:30} {:25}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string()))
285
            else:
286
                if desc.NodeClass == ua.NodeClass.Variable:
287
                    val = client.get_node(desc.NodeId).get_value()
288
                    print("{:30}, {!s:25}, {!s:25}, {!s:3}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string(), indent + desc.BrowseName.to_string(), indent + str(val)))
289
                else:
290
                    print("{:30}, {!s:25}, {!s:25}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string(), indent + desc.BrowseName.to_string()))
291
            if depth:
292
                _lsprint(client, desc.NodeId, print_format, depth - 1, indent)
293
    else:
294
        for node in pnode.get_children():
295
            attrs = node.get_attributes([ua.AttributeIds.DisplayName, 
296
                                         ua.AttributeIds.BrowseName,
297
                                         ua.AttributeIds.NodeClass,
298
                                         ua.AttributeIds.WriteMask,
299
                                         ua.AttributeIds.UserWriteMask,
300
                                         ua.AttributeIds.DataType,
301
                                         ua.AttributeIds.Value])
302
            name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
303
            update = attrs[-1].ServerTimestamp
304
            if nclass == ua.NodeClass.Variable:
305
                print("{} {}, {}, {}, {}, {update}, {value}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update=update, value=val))
306
            else:
307
                print("{} {}, {}, {}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
308
            if depth:
309
                _lsprint(client, node.nodeid, print_format, depth - 1, indent)
310
311
312
class SubHandler(object):
313
314
    def data_change(self, handle, node, val, attr):
315
        print("New data change event", handle, node, val, attr)
316
317
    def event(self, handle, event):
318
        print("New event", handle, event)
319
320
321
def uasubscribe():
322
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
323
    add_common_args(parser)
324
    parser.add_argument("-t",
325
                        "--eventtype",
326
                        dest="eventtype",
327
                        default="datachange",
328
                        choices=['datachange', 'event'],
329
                        help="Event type to subscribe to")
330
331
    args = parser.parse_args()
332
    if args.nodeid == "i=84" and args.path == "":
333
        parser.print_usage()
334
        print("uaread: error: The NodeId or BrowsePath of a variable is required")
335
        sys.exit(1)
336
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
337
338
    client = Client(args.url, timeout=args.timeout)
339
    client.connect()
340
    try:
341
        node = client.get_node(args.nodeid)
342
        if args.path:
343
            node = node.get_child(args.path.split(","))
344
        handler = SubHandler()
345
        sub = client.create_subscription(500, handler)
346
        if args.eventtype == "datachange":
347
            sub.subscribe_data_change(node)
348
        else:
349
            sub.subscribe_events(node)
350
        glbs = globals()
351
        glbs.update(locals())
352
        shell = code.InteractiveConsole(vars)
353
        shell.interact()
354
    finally:
355
        client.disconnect()
356
    sys.exit(0)
357
    print(args)
358
359
360
# converts numeric value to its enum name.
361
def enum_to_string(klass, value):
362
    if isinstance(value, Enum):
363
        return value.name
364
    # if value is not a subtype of Enum, try to find a constant
365
    # with this value in this class
366
    for k, v in vars(klass).items():
367
        if not k.startswith('__') and v == value:
368
            return k
369
    return 'Unknown {} ({})'.format(klass.__name__, value)
370
371
372
def application_to_strings(app):
373
    result = []
374
    result.append(('Application URI', app.ApplicationUri))
375
    optionals = [
376
        ('Product URI', app.ProductUri),
377
        ('Application Name', app.ApplicationName.to_string()),
378
        ('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
379
        ('Gateway Server URI', app.GatewayServerUri),
380
        ('Discovery Profile URI', app.DiscoveryProfileUri),
381
    ]
382
    for (n, v) in optionals:
383
        if v:
384
            result.append((n, v))
385
    for url in app.DiscoveryUrls:
386
        result.append(('Discovery URL', url))
387
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
388
389
390
def endpoint_to_strings(ep):
391
    result = [('Endpoint URL', ep.EndpointUrl)]
392
    result += application_to_strings(ep.Server)
393
    result += [
394
        ('Server Certificate', len(ep.ServerCertificate)),
395
        ('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
396
        ('Security Policy URI', ep.SecurityPolicyUri)]
397
    for tok in ep.UserIdentityTokens:
398
        result += [
399
            ('User policy', tok.PolicyId),
400
            ('  Token type', enum_to_string(ua.UserTokenType, tok.TokenType))]
401
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
402
            result += [
403
                ('  Issued Token type', tok.IssuedTokenType),
404
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
405
        if tok.SecurityPolicyUri:
406
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
407
    result += [
408
        ('Transport Profile URI', ep.TransportProfileUri),
409
        ('Security Level', ep.SecurityLevel)]
410
    return result
411
412
413
def uadiscover():
414
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
415
    add_minimum_args(parser)
416
    args = parser.parse_args()
417
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
418
419
    client = Client(args.url, timeout=args.timeout)
420
    for i, server in enumerate(client.find_all_servers(), start=1):
421
        print('Server {}:'.format(i))
422
        for (n, v) in application_to_strings(server):
423
            print('  {}: {}'.format(n, v))
424
        print('')
425
426
    client = Client(args.url, timeout=args.timeout)
427
    for i, ep in enumerate(client.get_server_endpoints()):
428
        print('Endpoint {}:'.format(i))
429
        for (n, v) in endpoint_to_strings(ep):
430
            print('  {}: {}'.format(n, v))
431
        print('')
432
433
    sys.exit(0)
434
435
436