Passed
Branch dev (949ba8)
by Olivier
03:17 queued 01:08
created

opcua._lsprint_0()   A

Complexity

Conditions 4

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 8
rs 9.2
1
import logging
2
import sys
3
import argparse
4
from datetime import datetime
5
import code
6
from enum import Enum
7
8
from opcua import ua
9
from opcua import Client
10
from opcua import Node
11
12
13
def add_minimum_args(parser):
14
    parser.add_argument("-u",
15
                        "--url",
16
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
17
                        default='opc.tcp://localhost:4841',
18
                        metavar="URL")
19
    parser.add_argument("-v",
20
                        "--verbose",
21
                        dest="loglevel",
22
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
23
                        default='WARNING',
24
                        help="Set log level")
25
    parser.add_argument("--timeout",
26
                        dest="timeout",
27
                        type=int,
28
                        default=1,
29
                        help="Set socket timeout (NOT the diverse UA timeouts)")
30
31
32
def add_common_args(parser):
33
    add_minimum_args(parser)
34
    parser.add_argument("-n",
35
                        "--nodeid",
36
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
37
                        default='i=84',
38
                        metavar="NODE")
39
    parser.add_argument("-p",
40
                        "--path",
41
                        help="Comma separated browse path to the node starting at nodeid (for example: 3:Mybject,3:MyVariable)",
42
                        default='',
43
                        metavar="BROWSEPATH")
44
    parser.add_argument("-i",
45
                        "--namespace",
46
                        help="Default namespace",
47
                        type=int,
48
                        default=0,
49
                        metavar="NAMESPACE")
50
51
52
def get_node(client, args):
53
    node = client.get_node(args.nodeid)
54
    if args.path:
55
        node = node.get_child(args.path.split(","))
56
57
58
def uaread():
59
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
60
    add_common_args(parser)
61
    parser.add_argument("-a",
62
                        "--attribute",
63
                        dest="attribute",
64
                        type=int,
65
                        #default="VALUE",
66
                        #choices=['VALUE', 'NODEID', 'BROWSENAME', 'ERROR', 'CRITICAL'],
67
                        default=ua.AttributeIds.Value,
68
                        help="Set attribute to read")
69
    parser.add_argument("-t",
70
                        "--datatype",
71
                        dest="datatype",
72
                        default="python",
73
                        choices=['python', 'variant', 'datavalue'],
74
                        help="Data type to return")
75
76
    args = parser.parse_args()
77
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
78
        parser.print_usage()
79
        print("uaread: error: A NodeId or BrowsePath is required")
80
        sys.exit(1)
81
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
82
83
    client = Client(args.url, timeout=args.timeout)
84
    client.connect()
85
    try:
86
        node = client.get_node(args.nodeid)
87
        if args.path:
88
            node = node.get_child(args.path.split(","))
89
        attr = node.get_attribute(args.attribute)
90
        if args.datatype == "python":
91
            print(attr.Value.Value)
92
        elif args.datatype == "variant":
93
            print(attr.Value)
94
        else:
95
            print(attr)
96
    finally:
97
        client.disconnect()
98
    sys.exit(0)
99
    print(args)
100
101
102
def _args_to_array(val, array):
103
    if array == "guess":
104
        if "," in val:
105
            array = "true"
106
    if array == "true":
107
        val = val.split(",")
108
    return val
109
110
111
def _arg_to_bool(val):
112
    if val in ("true", "True"):
113
        return True
114
    else:
115
        return False
116
117
118
def _arg_to_variant(val, array, ptype, varianttype=None):
119
    val = _args_to_array(val, array)
120
    if isinstance(val, list):
121
        val = [ptype(i) for i in val]
122
    else:
123
        val = ptype(val)
124
    if varianttype:
125
        return ua.Variant(val, varianttype)
126
    else:
127
        return ua.Variant(val)
128
129
130
def _val_to_variant(val, args):
131
    array = args.array
132
    if args.datatype == "guess":
133
        if val in ("true", "True", "false", "False"):
134
            return _arg_to_variant(val, array, _arg_to_bool)
135
        # FIXME: guess bool value
136
        try:
137
            return _arg_to_variant(val, array, int)
138
        except ValueError:
139
            try:
140
                return _arg_to_variant(val, array, float)
141
            except ValueError:
142
                return _arg_to_variant(val, array, str)
143
    elif args.datatype == "bool":
144
        if val in ("1", "True", "true"):
145
            return ua.Variant(True, ua.VariantType.Boolean)
146
        else:
147
            return ua.Variant(False, ua.VariantType.Boolean)
148
    elif args.datatype == "sbyte":
149
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
150
    elif args.datatype == "byte":
151
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
152
    #elif args.datatype == "uint8":
153
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
154
    elif args.datatype == "uint16":
155
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
156
    elif args.datatype == "uint32":
157
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
158
    elif args.datatype == "uint64":
159
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
160
    #elif args.datatype == "int8":
161
        #return ua.Variant(int(val), ua.VariantType.Int8)
162
    elif args.datatype == "int16":
163
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
164
    elif args.datatype == "int32":
165
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
166
    elif args.datatype == "int64":
167
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
168
    elif args.datatype == "float":
169
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
170
    elif args.datatype == "double":
171
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
172
    elif args.datatype == "string":
173
        return _arg_to_variant(val, array, str, ua.VariantType.String)
174
    elif args.datatype == "datetime":
175
        raise NotImplementedError
176
    elif args.datatype == "Guid":
177
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
178
    elif args.datatype == "ByteString":
179
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
180
    elif args.datatype == "xml":
181
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
182
    elif args.datatype == "nodeid":
183
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
184
    elif args.datatype == "expandednodeid":
185
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
186
    elif args.datatype == "statuscode":
187
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
188
    elif args.datatype in ("qualifiedname", "browsename"):
189
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
190
    elif args.datatype == "LocalizedText":
191
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantTypeLocalizedText)
192
193
194
def uawrite():
195
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
196
    add_common_args(parser)
197
    parser.add_argument("-a",
198
                        "--attribute",
199
                        dest="attribute",
200
                        type=int,
201
                        #default="VALUE",
202
                        #choices=['VALUE', 'NODEID', 'BROWSENAME', 'ERROR', 'CRITICAL'],
203
                        default=ua.AttributeIds.Value,
204
                        help="Set attribute to read")
205
    parser.add_argument("-l",
206
                        "--list",
207
                        "--array",
208
                        dest="array",
209
                        default="guess",
210
                        choices=["guess", "true", "false"],
211
                        help="Value is an array")
212
    parser.add_argument("-t",
213
                        "--datatype",
214
                        dest="datatype",
215
                        default="guess",
216
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],  
217
                        help="Data type to return")
218
    parser.add_argument("value",
219
                        help="Value to be written",
220
                        metavar="VALUE")
221
    args = parser.parse_args()
222
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
223
        parser.print_usage()
224
        print("uaread: error: A NodeId or BrowsePath is required")
225
        sys.exit(1)
226
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
227
228
    client = Client(args.url, timeout=args.timeout)
229
    client.connect()
230
    try:
231
        node = client.get_node(args.nodeid)
232
        if args.path:
233
            node = node.get_child(args.path.split(","))
234
        val = _val_to_variant(args.value, args)
235
        node.set_attribute(args.attribute, ua.DataValue(val))
236
    finally:
237
        client.disconnect()
238
    sys.exit(0)
239
    print(args)
240
241
242
def uals():
243
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
244
    add_common_args(parser)
245
    parser.add_argument("-l",
246
                        dest="long_format",
247
                        #action="store_true",
248
                        const=3,
249
                        nargs="?",
250
                        type=int,
251
                        help="use a long listing format")
252
    parser.add_argument("-d",
253
                        "--depth",
254
                        default=1,
255
                        type=int,
256
                        help="Browse depth")
257
258
    args = parser.parse_args()
259
    if args.long_format is None:
260
        args.long_format = 1
261
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
262
263
    client = Client(args.url, timeout=args.timeout)
264
    client.connect()
265
    try:
266
        node = client.get_node(args.nodeid)
267
        if args.path:
268
            node = node.get_child(args.path.split(","))
269
        print("Browsing node {} at {}\n".format(node, args.url))
270
        if args.long_format == 0:
271
            _lsprint_0(node, args.depth - 1)
272
        elif args.long_format == 1:
273
            _lsprint_1(node, args.depth - 1)
274
        else:
275
            _lsprint_long(node, args.depth - 1)
276
    finally:
277
        client.disconnect()
278
    sys.exit(0)
279
    print(args)
280
281
282
def _lsprint_0(node, depth, indent=""):
283
    if not indent:
284
        print("{:30} {:25}".format("DisplayName", "NodeId"))
285
        print("")
286
    for desc in node.get_children_descriptions():
287
        print("{}{:30} {:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
288
        if depth:
289
            _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
290
291
292
def _lsprint_1(node, depth, indent=""):
293
    if not indent:
294
        print("{:30} {:25} {:25} {:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
295
        print("")
296
297
    for desc in node.get_children_descriptions():
298
        if desc.NodeClass == ua.NodeClass.Variable:
299
            val = Node(node.server, desc.NodeId).get_value()
300
            print("{}{:30} {!s:25} {!s:25}, {!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
301
        else:
302
            print("{}{:30} {!s:25} {!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
303
        if depth:
304
            _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
305
306
307
def _lsprint_long(pnode, depth, indent=""):
308
    if not indent:
309
        print("{:30} {:25} {:25} {:10} {:30} {:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
310
        print("")
311
    for node in pnode.get_children():
312
        attrs = node.get_attributes([ua.AttributeIds.DisplayName, 
313
                                     ua.AttributeIds.BrowseName,
314
                                     ua.AttributeIds.NodeClass,
315
                                     ua.AttributeIds.WriteMask,
316
                                     ua.AttributeIds.UserWriteMask,
317
                                     ua.AttributeIds.DataType,
318
                                     ua.AttributeIds.Value])
319
        name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
320
        update = attrs[-1].ServerTimestamp
321
        if nclass == ua.NodeClass.Variable:
322
            print("{}{:30} {:25} {:25} {:10} {!s:30} {!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
323
        else:
324
            print("{}{:30} {:25} {:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
325
        if depth:
326
            _lsprint_long(node, depth - 1, indent + "  ")
327
328
329
class SubHandler(object):
330
331
    def data_change(self, handle, node, val, attr):
332
        print("New data change event", handle, node, val, attr)
333
334
    def event(self, handle, event):
335
        print("New event", handle, event)
336
337
338
def uasubscribe():
339
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
340
    add_common_args(parser)
341
    parser.add_argument("-t",
342
                        "--eventtype",
343
                        dest="eventtype",
344
                        default="datachange",
345
                        choices=['datachange', 'event'],
346
                        help="Event type to subscribe to")
347
348
    args = parser.parse_args()
349
    if args.nodeid == "i=84" and args.path == "":
350
        parser.print_usage()
351
        print("uaread: error: The NodeId or BrowsePath of a variable is required")
352
        sys.exit(1)
353
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
354
355
    client = Client(args.url, timeout=args.timeout)
356
    client.connect()
357
    try:
358
        node = client.get_node(args.nodeid)
359
        if args.path:
360
            node = node.get_child(args.path.split(","))
361
        handler = SubHandler()
362
        sub = client.create_subscription(500, handler)
363
        if args.eventtype == "datachange":
364
            sub.subscribe_data_change(node)
365
        else:
366
            sub.subscribe_events(node)
367
        glbs = globals()
368
        glbs.update(locals())
369
        shell = code.InteractiveConsole(vars)
370
        shell.interact()
371
    finally:
372
        client.disconnect()
373
    sys.exit(0)
374
    print(args)
375
376
377
# converts numeric value to its enum name.
378
def enum_to_string(klass, value):
379
    if isinstance(value, Enum):
380
        return value.name
381
    # if value is not a subtype of Enum, try to find a constant
382
    # with this value in this class
383
    for k, v in vars(klass).items():
384
        if not k.startswith('__') and v == value:
385
            return k
386
    return 'Unknown {} ({})'.format(klass.__name__, value)
387
388
389
def application_to_strings(app):
390
    result = []
391
    result.append(('Application URI', app.ApplicationUri))
392
    optionals = [
393
        ('Product URI', app.ProductUri),
394
        ('Application Name', app.ApplicationName.to_string()),
395
        ('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
396
        ('Gateway Server URI', app.GatewayServerUri),
397
        ('Discovery Profile URI', app.DiscoveryProfileUri),
398
    ]
399
    for (n, v) in optionals:
400
        if v:
401
            result.append((n, v))
402
    for url in app.DiscoveryUrls:
403
        result.append(('Discovery URL', url))
404
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
405
406
407
def endpoint_to_strings(ep):
408
    result = [('Endpoint URL', ep.EndpointUrl)]
409
    result += application_to_strings(ep.Server)
410
    result += [
411
        ('Server Certificate', len(ep.ServerCertificate)),
412
        ('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
413
        ('Security Policy URI', ep.SecurityPolicyUri)]
414
    for tok in ep.UserIdentityTokens:
415
        result += [
416
            ('User policy', tok.PolicyId),
417
            ('  Token type', enum_to_string(ua.UserTokenType, tok.TokenType))]
418
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
419
            result += [
420
                ('  Issued Token type', tok.IssuedTokenType),
421
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
422
        if tok.SecurityPolicyUri:
423
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
424
    result += [
425
        ('Transport Profile URI', ep.TransportProfileUri),
426
        ('Security Level', ep.SecurityLevel)]
427
    return result
428
429
430
def uadiscover():
431
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
432
    add_minimum_args(parser)
433
    args = parser.parse_args()
434
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
435
436
    client = Client(args.url, timeout=args.timeout)
437
    print("Performing discovery at {}\n".format(args.url))
438
    for i, server in enumerate(client.find_all_servers(), start=1):
439
        print('Server {}:'.format(i))
440
        for (n, v) in application_to_strings(server):
441
            print('  {}: {}'.format(n, v))
442
        print('')
443
444
    client = Client(args.url, timeout=args.timeout)
445
    for i, ep in enumerate(client.get_server_endpoints(), start=1):
446
        print('Endpoint {}:'.format(i))
447
        for (n, v) in endpoint_to_strings(ep):
448
            print('  {}: {}'.format(n, v))
449
        print('')
450
451
    sys.exit(0)
452
453
454
def print_history(o):
455
    if isinstance(o, ua.HistoryData):
456
        print("{:30} {:10} {}".format('Source timestamp', 'Status', 'Value'))
457
        for d in o.DataValues:
458
            print("{:30} {:10} {}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
459
460
461
def str_to_datetime(s):
462
    if not s:
463
        return datetime.utcnow()
464
    # try different datetime formats
465
    for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
466
        try:
467
            return datetime.strptime(s, fmt)
468
        except ValueError:
469
            pass
470
471
472
def uahistoryread():
473
    parser = argparse.ArgumentParser(description="Read history of a node")
474
    add_common_args(parser)
475
    parser.add_argument("--starttime",
476
                        default="",
477
                        help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
478
    parser.add_argument("--endtime",
479
                        default="",
480
                        help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
481
482
    args = parser.parse_args()
483
    if args.nodeid == "i=84" and args.path == "":
484
        parser.print_usage()
485
        print("uahistoryread: error: A NodeId or BrowsePath is required")
486
        sys.exit(1)
487
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
488
489
    client = Client(args.url, timeout=args.timeout)
490
    client.connect()
491
    try:
492
        node = client.get_node(args.nodeid)
493
        if args.path:
494
            node = node.get_child(args.path.split(","))
495
        starttime = str_to_datetime(args.starttime)
496
        endtime = str_to_datetime(args.endtime)
497
        print("Reading raw history of node {} at {}; start at {}, end at {}\n".format(node, args.url, starttime, endtime))
498
        print_history(node.read_raw_history(starttime, endtime))
499
    finally:
500
        client.disconnect()
501
    sys.exit(0)
502