Completed
Push — master ( 606348...b2f67f )
by Olivier
02:54
created

opcua.uaread()   B

Complexity

Conditions 6

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 37
rs 7.5385
1
import logging
2
import sys
3
import argparse
4
from datetime import datetime
5
from enum import Enum
6
import math
7
import time
8
9
try:
10
    from IPython import embed
11
except ImportError:
12
    import code
13
14
    def embed():
15
        code.interact(local=dict(globals(), **locals())) 
16
17
from opcua import ua
18
from opcua import Client
19
from opcua import Server
20
from opcua import Node
21
from opcua import uamethod
22
23
24
def add_minimum_args(parser):
25
    parser.add_argument("-u",
26
                        "--url",
27
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
28
                        default='opc.tcp://localhost:4841',
29
                        metavar="URL")
30
    parser.add_argument("-v",
31
                        "--verbose",
32
                        dest="loglevel",
33
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
34
                        default='WARNING',
35
                        help="Set log level")
36
    parser.add_argument("--timeout",
37
                        dest="timeout",
38
                        type=int,
39
                        default=1,
40
                        help="Set socket timeout (NOT the diverse UA timeouts)")
41
42
43
def add_common_args(parser):
44
    add_minimum_args(parser)
45
    parser.add_argument("-n",
46
                        "--nodeid",
47
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
48
                        default='i=84',
49
                        metavar="NODE")
50
    parser.add_argument("-p",
51
                        "--path",
52
                        help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
53
                        default='',
54
                        metavar="BROWSEPATH")
55
    parser.add_argument("-i",
56
                        "--namespace",
57
                        help="Default namespace",
58
                        type=int,
59
                        default=0,
60
                        metavar="NAMESPACE")
61
62
63
def parse_args(parser):
64
    args = parser.parse_args()
65
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
66
    if args.url and '://' not in args.url:
67
        logging.info("Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
68
        args.url = ua.OPC_TCP_SCHEME + '://' + args.url
69
    return args
70
71
def get_node(client, args):
72
    node = client.get_node(args.nodeid)
73
    if args.path:
74
        node = node.get_child(args.path.split(","))
75
    return node
76
77
78
def uaread():
79
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
80
    add_common_args(parser)
81
    parser.add_argument("-a",
82
                        "--attribute",
83
                        dest="attribute",
84
                        type=int,
85
                        default=ua.AttributeIds.Value,
86
                        help="Set attribute to read")
87
    parser.add_argument("-t",
88
                        "--datatype",
89
                        dest="datatype",
90
                        default="python",
91
                        choices=['python', 'variant', 'datavalue'],
92
                        help="Data type to return")
93
94
    args = parse_args(parser)
95
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
96
        parser.print_usage()
97
        print("uaread: error: A NodeId or BrowsePath is required")
98
        sys.exit(1)
99
100
    client = Client(args.url, timeout=args.timeout)
101
    client.connect()
102
    try:
103
        node = get_node(client, args)
104
        attr = node.get_attribute(args.attribute)
105
        if args.datatype == "python":
106
            print(attr.Value.Value)
107
        elif args.datatype == "variant":
108
            print(attr.Value)
109
        else:
110
            print(attr)
111
    finally:
112
        client.disconnect()
113
    sys.exit(0)
114
    print(args)
115
116
117
def _args_to_array(val, array):
118
    if array == "guess":
119
        if "," in val:
120
            array = "true"
121
    if array == "true":
122
        val = val.split(",")
123
    return val
124
125
126
def _arg_to_bool(val):
127
    if val in ("true", "True"):
128
        return True
129
    else:
130
        return False
131
132
133
def _arg_to_variant(val, array, ptype, varianttype=None):
134
    val = _args_to_array(val, array)
135
    if isinstance(val, list):
136
        val = [ptype(i) for i in val]
137
    else:
138
        val = ptype(val)
139
    if varianttype:
140
        return ua.Variant(val, varianttype)
141
    else:
142
        return ua.Variant(val)
143
144
145
def _val_to_variant(val, args):
146
    array = args.array
147
    if args.datatype == "guess":
148
        if val in ("true", "True", "false", "False"):
149
            return _arg_to_variant(val, array, _arg_to_bool)
150
        # FIXME: guess bool value
151
        try:
152
            return _arg_to_variant(val, array, int)
153
        except ValueError:
154
            try:
155
                return _arg_to_variant(val, array, float)
156
            except ValueError:
157
                return _arg_to_variant(val, array, str)
158
    elif args.datatype == "bool":
159
        if val in ("1", "True", "true"):
160
            return ua.Variant(True, ua.VariantType.Boolean)
161
        else:
162
            return ua.Variant(False, ua.VariantType.Boolean)
163
    elif args.datatype == "sbyte":
164
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
165
    elif args.datatype == "byte":
166
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
167
    #elif args.datatype == "uint8":
168
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
169
    elif args.datatype == "uint16":
170
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
171
    elif args.datatype == "uint32":
172
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
173
    elif args.datatype == "uint64":
174
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
175
    #elif args.datatype == "int8":
176
        #return ua.Variant(int(val), ua.VariantType.Int8)
177
    elif args.datatype == "int16":
178
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
179
    elif args.datatype == "int32":
180
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
181
    elif args.datatype == "int64":
182
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
183
    elif args.datatype == "float":
184
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
185
    elif args.datatype == "double":
186
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
187
    elif args.datatype == "string":
188
        return _arg_to_variant(val, array, str, ua.VariantType.String)
189
    elif args.datatype == "datetime":
190
        raise NotImplementedError
191
    elif args.datatype == "Guid":
192
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
193
    elif args.datatype == "ByteString":
194
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
195
    elif args.datatype == "xml":
196
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
197
    elif args.datatype == "nodeid":
198
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
199
    elif args.datatype == "expandednodeid":
200
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
201
    elif args.datatype == "statuscode":
202
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
203
    elif args.datatype in ("qualifiedname", "browsename"):
204
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
205
    elif args.datatype == "LocalizedText":
206
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantTypeLocalizedText)
207
208
209
def uawrite():
210
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
211
    add_common_args(parser)
212
    parser.add_argument("-a",
213
                        "--attribute",
214
                        dest="attribute",
215
                        type=int,
216
                        default=ua.AttributeIds.Value,
217
                        help="Set attribute to read")
218
    parser.add_argument("-l",
219
                        "--list",
220
                        "--array",
221
                        dest="array",
222
                        default="guess",
223
                        choices=["guess", "true", "false"],
224
                        help="Value is an array")
225
    parser.add_argument("-t",
226
                        "--datatype",
227
                        dest="datatype",
228
                        default="guess",
229
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],  
230
                        help="Data type to return")
231
    parser.add_argument("value",
232
                        help="Value to be written",
233
                        metavar="VALUE")
234
    args = parse_args(parser)
235
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
236
        parser.print_usage()
237
        print("uaread: error: A NodeId or BrowsePath is required")
238
        sys.exit(1)
239
240
    client = Client(args.url, timeout=args.timeout)
241
    client.connect()
242
    try:
243
        node = get_node(client, args)
244
        val = _val_to_variant(args.value, args)
245
        node.set_attribute(args.attribute, ua.DataValue(val))
246
    finally:
247
        client.disconnect()
248
    sys.exit(0)
249
    print(args)
250
251
252
def uals():
253
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
254
    add_common_args(parser)
255
    parser.add_argument("-l",
256
                        dest="long_format",
257
                        const=3,
258
                        nargs="?",
259
                        type=int,
260
                        help="use a long listing format")
261
    parser.add_argument("-d",
262
                        "--depth",
263
                        default=1,
264
                        type=int,
265
                        help="Browse depth")
266
267
    args = parse_args(parser)
268
    if args.long_format is None:
269
        args.long_format = 1
270
271
    client = Client(args.url, timeout=args.timeout)
272
    client.connect()
273
    try:
274
        node = get_node(client, args)
275
        print("Browsing node {} at {}\n".format(node, args.url))
276
        if args.long_format == 0:
277
            _lsprint_0(node, args.depth - 1)
278
        elif args.long_format == 1:
279
            _lsprint_1(node, args.depth - 1)
280
        else:
281
            _lsprint_long(node, args.depth - 1)
282
    finally:
283
        client.disconnect()
284
    sys.exit(0)
285
    print(args)
286
287
288
def _lsprint_0(node, depth, indent=""):
289
    if not indent:
290
        print("{:30} {:25}".format("DisplayName", "NodeId"))
291
        print("")
292
    for desc in node.get_children_descriptions():
293
        print("{}{:30} {:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
294
        if depth:
295
            _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
296
297
298
def _lsprint_1(node, depth, indent=""):
299
    if not indent:
300
        print("{:30} {:25} {:25} {:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
301
        print("")
302
303
    for desc in node.get_children_descriptions():
304
        if desc.NodeClass == ua.NodeClass.Variable:
305
            val = Node(node.server, desc.NodeId).get_value()
306
            print("{}{:30} {!s:25} {!s:25}, {!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
307
        else:
308
            print("{}{:30} {!s:25} {!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
309
        if depth:
310
            _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
311
312
313
def _lsprint_long(pnode, depth, indent=""):
314
    if not indent:
315
        print("{:30} {:25} {:25} {:10} {:30} {:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
316
        print("")
317
    for node in pnode.get_children():
318
        attrs = node.get_attributes([ua.AttributeIds.DisplayName, 
319
                                     ua.AttributeIds.BrowseName,
320
                                     ua.AttributeIds.NodeClass,
321
                                     ua.AttributeIds.WriteMask,
322
                                     ua.AttributeIds.UserWriteMask,
323
                                     ua.AttributeIds.DataType,
324
                                     ua.AttributeIds.Value])
325
        name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
326
        update = attrs[-1].ServerTimestamp
327
        if nclass == ua.NodeClass.Variable:
328
            print("{}{:30} {:25} {:25} {:10} {!s:30} {!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
329
        else:
330
            print("{}{:30} {:25} {:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
331
        if depth:
332
            _lsprint_long(node, depth - 1, indent + "  ")
333
334
335
class SubHandler(object):
336
337
    def data_change(self, handle, node, val, attr):
338
        print("New data change event", handle, node, val, attr)
339
340
    def event(self, handle, event):
341
        print("New event", handle, event)
342
343
344
def uasubscribe():
345
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
346
    add_common_args(parser)
347
    parser.add_argument("-t",
348
                        "--eventtype",
349
                        dest="eventtype",
350
                        default="datachange",
351
                        choices=['datachange', 'event'],
352
                        help="Event type to subscribe to")
353
354
    args = parse_args(parser)
355
    if args.nodeid == "i=84" and args.path == "":
356
        parser.print_usage()
357
        print("uaread: error: The NodeId or BrowsePath of a variable is required")
358
        sys.exit(1)
359
360
    client = Client(args.url, timeout=args.timeout)
361
    client.connect()
362
    try:
363
        node = get_node(client, args)
364
        handler = SubHandler()
365
        sub = client.create_subscription(500, handler)
366
        if args.eventtype == "datachange":
367
            sub.subscribe_data_change(node)
368
        else:
369
            sub.subscribe_events(node)
370
        embed()
371
    finally:
372
        client.disconnect()
373
    sys.exit(0)
374
    print(args)
375
376
377
# converts numeric value to its enum name.
378
def enum_to_string(klass, value):
379
    if isinstance(value, Enum):
380
        return value.name
381
    # if value is not a subtype of Enum, try to find a constant
382
    # with this value in this class
383
    for k, v in vars(klass).items():
384
        if not k.startswith('__') and v == value:
385
            return k
386
    return 'Unknown {} ({})'.format(klass.__name__, value)
387
388
389
def application_to_strings(app):
390
    result = []
391
    result.append(('Application URI', app.ApplicationUri))
392
    optionals = [
393
        ('Product URI', app.ProductUri),
394
        ('Application Name', app.ApplicationName.to_string()),
395
        ('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
396
        ('Gateway Server URI', app.GatewayServerUri),
397
        ('Discovery Profile URI', app.DiscoveryProfileUri),
398
    ]
399
    for (n, v) in optionals:
400
        if v:
401
            result.append((n, v))
402
    for url in app.DiscoveryUrls:
403
        result.append(('Discovery URL', url))
404
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
405
406
407
def endpoint_to_strings(ep):
408
    result = [('Endpoint URL', ep.EndpointUrl)]
409
    result += application_to_strings(ep.Server)
410
    result += [
411
        ('Server Certificate', len(ep.ServerCertificate)),
412
        ('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
413
        ('Security Policy URI', ep.SecurityPolicyUri)]
414
    for tok in ep.UserIdentityTokens:
415
        result += [
416
            ('User policy', tok.PolicyId),
417
            ('  Token type', enum_to_string(ua.UserTokenType, tok.TokenType))]
418
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
419
            result += [
420
                ('  Issued Token type', tok.IssuedTokenType),
421
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
422
        if tok.SecurityPolicyUri:
423
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
424
    result += [
425
        ('Transport Profile URI', ep.TransportProfileUri),
426
        ('Security Level', ep.SecurityLevel)]
427
    return result
428
429
430
def uaclient():
431
    parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
432
    add_common_args(parser)
433
    parser.add_argument("-c",
434
                        "--certificate",
435
                        help="set client certificate")
436
    parser.add_argument("-k",
437
                        "--private_key",
438
                        help="set client private key")
439
    args = parse_args(parser)
440
441
    client = Client(args.url, timeout=args.timeout)
442
    client.connect()
443
    if args.certificate:
444
        client.load_certificate(args.certificate)
445
    if args.private_key:
446
        client.load_certificate(args.private_key)
447
    try:
448
        root = client.get_root_node()
449
        objects = client.get_objects_node()
450
        mynode = get_node(client, args)
451
        embed()
452
    finally:
453
        client.disconnect()
454
    sys.exit(0)
455
456
457
def uaserver():
458
    parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
459
    # we setup a server, this is a bit different from other tool so we do not reuse common arguments
460
    parser.add_argument("-u",
461
                        "--url",
462
                        help="URL of OPC UA server, default is opc.tcp://0.0.0.0:4841",
463
                        default='opc.tcp://0.0.0.0:4841',
464
                        metavar="URL")
465
    parser.add_argument("-v",
466
                        "--verbose",
467
                        dest="loglevel",
468
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
469
                        default='WARNING',
470
                        help="Set log level")
471
    parser.add_argument("-x",
472
                        "--xml",
473
                        metavar="XML_FILE",
474
                        help="Populate address space with nodes defined in XML")
475
    parser.add_argument("-p",
476
                        "--populate",
477
                        action="store_false",
478
                        help="Populate address space with some sample nodes")
479
    parser.add_argument("-s",
480
                        "--shell",
481
                        action="store_true",
482
                        help="Start python shell instead of randomly changing node values")
483
    args = parser.parse_args()
484
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
485
486
    server = Server()
487
    server.set_endpoint(args.url)
488
    server.set_server_name("FreeOpcUa Example Server")
489
    if args.xml:
490
        server.import_xml(args.xml)
491
    if args.populate:
492
        @uamethod
493
        def multiply(parent, x, y):
494
            print("multiply method call with parameters: ", x, y)
495
            return x * y
496
497
        uri = "http://examples.freeopcua.github.io"
498
        idx = server.register_namespace(uri)
499
        objects = server.get_objects_node()
500
        myobj = objects.add_object(idx, "MyObject")
501
        mywritablevar = myobj.add_variable(idx, "MyWritableVariable", 6.7)
502
        mywritablevar.set_writable()    # Set MyVariable to be writable by clients
503
        myvar = myobj.add_variable(idx, "MyVariable", 6.7)
504
        myarrayvar = myobj.add_variable(idx, "MyVarArray", [6.7, 7.9])
505
        myprop = myobj.add_property(idx, "MyProperty", "I am a property")
506
        mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
507
508
    server.start()
509
    try:
510
        if args.shell:
511
            embed()
512
        else:
513
            count = 0
514
            while True:
515
                time.sleep(1)
516
                myvar.set_value(math.sin(count / 10))
517
                myarrayvar.set_value([math.sin(count / 10), math.sin(count / 100)])
518
                count += 1
519
    finally:
520
        server.stop()
521
    sys.exit(0)
522
523
524
525
526
527
def uadiscover():
528
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
529
    add_minimum_args(parser)
530
    parser.add_argument("-n",
531
                        "--network",
532
                        action="store_true",
533
                        help="Also send a FindServersOnNetwork request to server")
534
    #parser.add_argument("-s",
535
                        #"--servers",
536
                        #action="store_false",
537
                        #help="send a FindServers request to server")
538
    #parser.add_argument("-e",
539
                        #"--endpoints",
540
                        #action="store_false",
541
                        #help="send a GetEndpoints request to server")
542
    args = parse_args(parser)
543
    
544
    if args.network:
545
        client = Client(args.url, timeout=args.timeout)
546
        print("Performing discovery at {}\n".format(args.url))
547
        for i, server in enumerate(client.find_all_servers_on_network(), start=1):
548
            print('Server {}:'.format(i))
549
            #for (n, v) in application_to_strings(server):
550
                #print('  {}: {}'.format(n, v))
551
            print('')
552
553
    client = Client(args.url, timeout=args.timeout)
554
    print("Performing discovery at {}\n".format(args.url))
555
    for i, server in enumerate(client.find_all_servers(), start=1):
556
        print('Server {}:'.format(i))
557
        for (n, v) in application_to_strings(server):
558
            print('  {}: {}'.format(n, v))
559
        print('')
560
561
    client = Client(args.url, timeout=args.timeout)
562
    for i, ep in enumerate(client.get_server_endpoints(), start=1):
563
        print('Endpoint {}:'.format(i))
564
        for (n, v) in endpoint_to_strings(ep):
565
            print('  {}: {}'.format(n, v))
566
        print('')
567
568
    sys.exit(0)
569
570
571
def print_history(o):
572
    if isinstance(o, ua.HistoryData):
573
        print("{:30} {:10} {}".format('Source timestamp', 'Status', 'Value'))
574
        for d in o.DataValues:
575
            print("{:30} {:10} {}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
576
577
578
def str_to_datetime(s):
579
    if not s:
580
        return datetime.utcnow()
581
    # try different datetime formats
582
    for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
583
        try:
584
            return datetime.strptime(s, fmt)
585
        except ValueError:
586
            pass
587
588
589
def uahistoryread():
590
    parser = argparse.ArgumentParser(description="Read history of a node")
591
    add_common_args(parser)
592
    parser.add_argument("--starttime",
593
                        default="",
594
                        help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
595
    parser.add_argument("--endtime",
596
                        default="",
597
                        help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
598
599
    args = parse_args(parser)
600
    if args.nodeid == "i=84" and args.path == "":
601
        parser.print_usage()
602
        print("uahistoryread: error: A NodeId or BrowsePath is required")
603
        sys.exit(1)
604
605
    client = Client(args.url, timeout=args.timeout)
606
    client.connect()
607
    try:
608
        node = get_node(client, args)
609
        starttime = str_to_datetime(args.starttime)
610
        endtime = str_to_datetime(args.endtime)
611
        print("Reading raw history of node {} at {}; start at {}, end at {}\n".format(node, args.url, starttime, endtime))
612
        print_history(node.read_raw_history(starttime, endtime))
613
    finally:
614
        client.disconnect()
615
    sys.exit(0)
616