Completed
Push — dev ( 7c3457...b4ea62 )
by Olivier
04:07
created

opcua.print_history()   A

Complexity

Conditions 3

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 6
rs 9.4286
cc 3
1
import logging
2
import sys
3
import argparse
4
from opcua import ua, Client
5
import code
6
from enum import Enum
7
from datetime import datetime
8
9
10
def add_minimum_args(parser):
11
    parser.add_argument("-u",
12
                        "--url",
13
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
14
                        default='opc.tcp://localhost:4841',
15
                        metavar="URL")
16
    parser.add_argument("-v",
17
                        "--verbose",
18
                        dest="loglevel",
19
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
20
                        default='WARNING',
21
                        help="Set log level")
22
    parser.add_argument("--timeout",
23
                        dest="timeout",
24
                        type=int,
25
                        default=1,
26
                        help="Set socket timeout (NOT the diverse UA timeouts)")
27
28
29
def add_common_args(parser):
30
    add_minimum_args(parser)
31
    parser.add_argument("-n",
32
                        "--nodeid",
33
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
34
                        default='i=84',
35
                        metavar="NODE")
36
    parser.add_argument("-p",
37
                        "--path",
38
                        help="Comma separated browse path to the node starting at nodeid (for example: 3:Mybject,3:MyVariable)",
39
                        default='',
40
                        metavar="BROWSEPATH")
41
    parser.add_argument("-i",
42
                        "--namespace",
43
                        help="Default namespace",
44
                        type=int,
45
                        default=0,
46
                        metavar="NAMESPACE")
47
48
49
def get_node(client, args):
50
    node = client.get_node(args.nodeid)
51
    if args.path:
52
        node = node.get_child(args.path.split(","))
53
54
55
def uaread():
56
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
57
    add_common_args(parser)
58
    parser.add_argument("-a",
59
                        "--attribute",
60
                        dest="attribute",
61
                        type=int,
62
                        #default="VALUE",
63
                        #choices=['VALUE', 'NODEID', 'BROWSENAME', 'ERROR', 'CRITICAL'],
64
                        default=ua.AttributeIds.Value,
65
                        help="Set attribute to read")
66
    parser.add_argument("-t",
67
                        "--datatype",
68
                        dest="datatype",
69
                        default="python",
70
                        choices=['python', 'variant', 'datavalue'],
71
                        help="Data type to return")
72
73
    args = parser.parse_args()
74
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
75
        parser.print_usage()
76
        print("uaread: error: A NodeId or BrowsePath is required")
77
        sys.exit(1)
78
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
79
80
    client = Client(args.url, timeout=args.timeout)
81
    client.connect()
82
    try:
83
        node = client.get_node(args.nodeid)
84
        if args.path:
85
            node = node.get_child(args.path.split(","))
86
        attr = node.get_attribute(args.attribute)
87
        if args.datatype == "python":
88
            print(attr.Value.Value)
89
        elif args.datatype == "variant":
90
            print(attr.Value)
91
        else:
92
            print(attr)
93
    finally:
94
        client.disconnect()
95
    sys.exit(0)
96
    print(args)
97
98
99
def _args_to_array(val, array):
100
    if array == "guess":
101
        if "," in val:
102
            array = "true"
103
    if array == "true":
104
        val = val.split(",")
105
    return val
106
107
108
def _arg_to_bool(val):
109
    if val in ("true", "True"):
110
        return True
111
    else:
112
        return False
113
114
115
def _arg_to_variant(val, array, ptype, varianttype=None):
116
    val = _args_to_array(val, array)
117
    if isinstance(val, list):
118
        val = [ptype(i) for i in val]
119
    else:
120
        val = ptype(val)
121
    if varianttype:
122
        return ua.Variant(val, varianttype)
123
    else:
124
        return ua.Variant(val)
125
126
127
def _val_to_variant(val, args):
128
    array = args.array
129
    if args.datatype == "guess":
130
        if val in ("true", "True", "false", "False"):
131
            return _arg_to_variant(val, array, _arg_to_bool)
132
        # FIXME: guess bool value
133
        try:
134
            return _arg_to_variant(val, array, int)
135
        except ValueError:
136
            try:
137
                return _arg_to_variant(val, array, float)
138
            except ValueError:
139
                return _arg_to_variant(val, array, str)
140
    elif args.datatype == "bool":
141
        if val in ("1", "True", "true"):
142
            return ua.Variant(True, ua.VariantType.Boolean)
143
        else:
144
            return ua.Variant(False, ua.VariantType.Boolean)
145
    elif args.datatype == "sbyte":
146
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
147
    elif args.datatype == "byte":
148
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
149
    #elif args.datatype == "uint8":
150
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
151
    elif args.datatype == "uint16":
152
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
153
    elif args.datatype == "uint32":
154
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
155
    elif args.datatype == "uint64":
156
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
157
    #elif args.datatype == "int8":
158
        #return ua.Variant(int(val), ua.VariantType.Int8)
159
    elif args.datatype == "int16":
160
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
161
    elif args.datatype == "int32":
162
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
163
    elif args.datatype == "int64":
164
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
165
    elif args.datatype == "float":
166
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
167
    elif args.datatype == "double":
168
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
169
    elif args.datatype == "string":
170
        return _arg_to_variant(val, array, str, ua.VariantType.String)
171
    elif args.datatype == "datetime":
172
        raise NotImplementedError
173
    elif args.datatype == "Guid":
174
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
175
    elif args.datatype == "ByteString":
176
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
177
    elif args.datatype == "xml":
178
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
179
    elif args.datatype == "nodeid":
180
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
181
    elif args.datatype == "expandednodeid":
182
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
183
    elif args.datatype == "statuscode":
184
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
185
    elif args.datatype in ("qualifiedname", "browsename"):
186
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
187
    elif args.datatype == "LocalizedText":
188
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantTypeLocalizedText)
189
190
191
def uawrite():
192
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
193
    add_common_args(parser)
194
    parser.add_argument("-a",
195
                        "--attribute",
196
                        dest="attribute",
197
                        type=int,
198
                        #default="VALUE",
199
                        #choices=['VALUE', 'NODEID', 'BROWSENAME', 'ERROR', 'CRITICAL'],
200
                        default=ua.AttributeIds.Value,
201
                        help="Set attribute to read")
202
    parser.add_argument("-l",
203
                        "--list",
204
                        "--array",
205
                        dest="array",
206
                        default="guess",
207
                        choices=["guess", "true", "false"],
208
                        help="Value is an array")
209
    parser.add_argument("-t",
210
                        "--datatype",
211
                        dest="datatype",
212
                        default="guess",
213
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],  
214
                        help="Data type to return")
215
    parser.add_argument("value",
216
                        help="Value to be written",
217
                        metavar="VALUE")
218
    args = parser.parse_args()
219
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
220
        parser.print_usage()
221
        print("uaread: error: A NodeId or BrowsePath is required")
222
        sys.exit(1)
223
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
224
225
    client = Client(args.url, timeout=args.timeout)
226
    client.connect()
227
    try:
228
        node = client.get_node(args.nodeid)
229
        if args.path:
230
            node = node.get_child(args.path.split(","))
231
        val = _val_to_variant(args.value, args)
232
        node.set_attribute(args.attribute, ua.DataValue(val))
233
    finally:
234
        client.disconnect()
235
    sys.exit(0)
236
    print(args)
237
238
239
def uals():
240
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
241
    add_common_args(parser)
242
    parser.add_argument("-l",
243
                        dest="print_format",
244
                        #action="store_true",
245
                        default=1,
246
                        nargs="?",
247
                        type=int,
248
                        help="use a long listing format")
249
    parser.add_argument("-d",
250
                        "--depth",
251
                        default=1,
252
                        type=int,
253
                        help="Browse depth")
254
255
    args = parser.parse_args()
256
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
257
258
    client = Client(args.url, timeout=args.timeout)
259
    client.connect()
260
    try:
261
        node = client.get_node(args.nodeid)
262
        if args.path:
263
            node = node.get_child(args.path.split(","))
264
        print("Browsing node {} at {}\n".format(node, args.url))
265
        if args.print_format == 0:
266
            print("{:30} {:25}".format("DisplayName", "NodeId"))
267
            print("")
268
        elif args.print_format == 1:
269
            print("{:30} {:25} {:25} {:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
270
            print("")
271
        _lsprint(client, node.nodeid, args.print_format, args.depth - 1)
272
273
    finally:
274
        client.disconnect()
275
    sys.exit(0)
276
    print(args)
277
278
279
def _lsprint(client, nodeid, print_format, depth, indent=""):
280
    indent += "  "
281
    pnode = client.get_node(nodeid)
282
    if print_format in (0, 1):
283
        for desc in pnode.get_children_descriptions():
284
            if print_format == 0:
285
                print("{:30} {:25}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string()))
286
            else:
287
                if desc.NodeClass == ua.NodeClass.Variable:
288
                    val = client.get_node(desc.NodeId).get_value()
289
                    print("{:30}, {!s:25}, {!s:25}, {!s:3}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string(), indent + desc.BrowseName.to_string(), indent + str(val)))
290
                else:
291
                    print("{:30}, {!s:25}, {!s:25}".format(indent + desc.DisplayName.to_string(), indent + desc.NodeId.to_string(), indent + desc.BrowseName.to_string()))
292
            if depth:
293
                _lsprint(client, desc.NodeId, print_format, depth - 1, indent)
294
    else:
295
        for node in pnode.get_children():
296
            attrs = node.get_attributes([ua.AttributeIds.DisplayName, 
297
                                         ua.AttributeIds.BrowseName,
298
                                         ua.AttributeIds.NodeClass,
299
                                         ua.AttributeIds.WriteMask,
300
                                         ua.AttributeIds.UserWriteMask,
301
                                         ua.AttributeIds.DataType,
302
                                         ua.AttributeIds.Value])
303
            name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
304
            update = attrs[-1].ServerTimestamp
305
            if nclass == ua.NodeClass.Variable:
306
                print("{} {}, {}, {}, {}, {update}, {value}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update=update, value=val))
307
            else:
308
                print("{} {}, {}, {}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
309
            if depth:
310
                _lsprint(client, node.nodeid, print_format, depth - 1, indent)
311
312
313
class SubHandler(object):
314
315
    def data_change(self, handle, node, val, attr):
316
        print("New data change event", handle, node, val, attr)
317
318
    def event(self, handle, event):
319
        print("New event", handle, event)
320
321
322
def uasubscribe():
323
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
324
    add_common_args(parser)
325
    parser.add_argument("-t",
326
                        "--eventtype",
327
                        dest="eventtype",
328
                        default="datachange",
329
                        choices=['datachange', 'event'],
330
                        help="Event type to subscribe to")
331
332
    args = parser.parse_args()
333
    if args.nodeid == "i=84" and args.path == "":
334
        parser.print_usage()
335
        print("uaread: error: The NodeId or BrowsePath of a variable is required")
336
        sys.exit(1)
337
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
338
339
    client = Client(args.url, timeout=args.timeout)
340
    client.connect()
341
    try:
342
        node = client.get_node(args.nodeid)
343
        if args.path:
344
            node = node.get_child(args.path.split(","))
345
        handler = SubHandler()
346
        sub = client.create_subscription(500, handler)
347
        if args.eventtype == "datachange":
348
            sub.subscribe_data_change(node)
349
        else:
350
            sub.subscribe_events(node)
351
        glbs = globals()
352
        glbs.update(locals())
353
        shell = code.InteractiveConsole(vars)
354
        shell.interact()
355
    finally:
356
        client.disconnect()
357
    sys.exit(0)
358
    print(args)
359
360
361
# converts numeric value to its enum name.
362
def enum_to_string(klass, value):
363
    if isinstance(value, Enum):
364
        return value.name
365
    # if value is not a subtype of Enum, try to find a constant
366
    # with this value in this class
367
    for k, v in vars(klass).items():
368
        if not k.startswith('__') and v == value:
369
            return k
370
    return 'Unknown {} ({})'.format(klass.__name__, value)
371
372
373
def application_to_strings(app):
374
    result = []
375
    result.append(('Application URI', app.ApplicationUri))
376
    optionals = [
377
        ('Product URI', app.ProductUri),
378
        ('Application Name', app.ApplicationName.to_string()),
379
        ('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
380
        ('Gateway Server URI', app.GatewayServerUri),
381
        ('Discovery Profile URI', app.DiscoveryProfileUri),
382
    ]
383
    for (n, v) in optionals:
384
        if v:
385
            result.append((n, v))
386
    for url in app.DiscoveryUrls:
387
        result.append(('Discovery URL', url))
388
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
389
390
391
def endpoint_to_strings(ep):
392
    result = [('Endpoint URL', ep.EndpointUrl)]
393
    result += application_to_strings(ep.Server)
394
    result += [
395
        ('Server Certificate', len(ep.ServerCertificate)),
396
        ('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
397
        ('Security Policy URI', ep.SecurityPolicyUri)]
398
    for tok in ep.UserIdentityTokens:
399
        result += [
400
            ('User policy', tok.PolicyId),
401
            ('  Token type', enum_to_string(ua.UserTokenType, tok.TokenType))]
402
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
403
            result += [
404
                ('  Issued Token type', tok.IssuedTokenType),
405
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
406
        if tok.SecurityPolicyUri:
407
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
408
    result += [
409
        ('Transport Profile URI', ep.TransportProfileUri),
410
        ('Security Level', ep.SecurityLevel)]
411
    return result
412
413
414
def uadiscover():
415
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
416
    add_minimum_args(parser)
417
    args = parser.parse_args()
418
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
419
420
    client = Client(args.url, timeout=args.timeout)
421
    print("Performing discovery at {}\n".format(args.url))
422
    for i, server in enumerate(client.find_all_servers(), start=1):
423
        print('Server {}:'.format(i))
424
        for (n, v) in application_to_strings(server):
425
            print('  {}: {}'.format(n, v))
426
        print('')
427
428
    client = Client(args.url, timeout=args.timeout)
429
    for i, ep in enumerate(client.get_server_endpoints(), start=1):
430
        print('Endpoint {}:'.format(i))
431
        for (n, v) in endpoint_to_strings(ep):
432
            print('  {}: {}'.format(n, v))
433
        print('')
434
435
    sys.exit(0)
436
437
def print_history(res):
438
    if res.TypeId.Identifier == ua.ObjectIds.HistoryData_Encoding_DefaultBinary:
439
        buf = ua.utils.Buffer(res.Body)
440
        print("{:30} {:10} {}".format('Source timestamp', 'Status', 'Value'))
441
        for d in ua.HistoryData.from_binary(buf).DataValues:
442
            print("{:30} {:10} {}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
443
444
def str_to_datetime(s):
445
    if not s:
446
        return datetime.utcnow()
447
    # try different datetime formats
448
    for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
449
        try:
450
            return datetime.strptime(s, fmt)
451
        except ValueError:
452
            pass
453
454
def uahistoryread():
455
    parser = argparse.ArgumentParser(description="Read history of a node")
456
    add_common_args(parser)
457
    parser.add_argument("--starttime",
458
                        default="",
459
                        help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
460
    parser.add_argument("--endtime",
461
                        default="",
462
                        help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
463
464
    args = parser.parse_args()
465
    if args.nodeid == "i=84" and args.path == "":
466
        parser.print_usage()
467
        print("uahistoryread: error: A NodeId or BrowsePath is required")
468
        sys.exit(1)
469
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
470
471
    client = Client(args.url, timeout=args.timeout)
472
    client.connect()
473
    try:
474
        node = client.get_node(args.nodeid)
475
        if args.path:
476
            node = node.get_child(args.path.split(","))
477
        starttime = str_to_datetime(args.starttime)
478
        endtime = str_to_datetime(args.endtime)
479
        print("Reading raw history of node {} at {}; start at {}, end at {}\n".format(node, args.url, starttime, endtime))
480
        print_history(node.read_raw_history(starttime, endtime))
481
    finally:
482
        client.disconnect()
483
    sys.exit(0)
484