Completed
Push — dev ( 17cd27...3b343f )
by Olivier
02:12
created

opcua.multiply()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 4
rs 10
1
import logging
2
import sys
3
import argparse
4
from datetime import datetime
5
from enum import Enum
6
7
try:
8
    from IPython import embed
9
except ImportError:
10
    import code
11
12
    def embed():
13
        code.interact(local=dict(globals(), **locals())) 
14
15
from opcua import ua
16
from opcua import Client
17
from opcua import Server
18
from opcua import Node
19
from opcua import uamethod
20
21
22
def add_minimum_args(parser):
23
    parser.add_argument("-u",
24
                        "--url",
25
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
26
                        default='opc.tcp://localhost:4841',
27
                        metavar="URL")
28
    parser.add_argument("-v",
29
                        "--verbose",
30
                        dest="loglevel",
31
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
32
                        default='WARNING',
33
                        help="Set log level")
34
    parser.add_argument("--timeout",
35
                        dest="timeout",
36
                        type=int,
37
                        default=1,
38
                        help="Set socket timeout (NOT the diverse UA timeouts)")
39
40
41
def add_common_args(parser):
42
    add_minimum_args(parser)
43
    parser.add_argument("-n",
44
                        "--nodeid",
45
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
46
                        default='i=84',
47
                        metavar="NODE")
48
    parser.add_argument("-p",
49
                        "--path",
50
                        help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
51
                        default='',
52
                        metavar="BROWSEPATH")
53
    parser.add_argument("-i",
54
                        "--namespace",
55
                        help="Default namespace",
56
                        type=int,
57
                        default=0,
58
                        metavar="NAMESPACE")
59
60
61
def get_node(client, args):
62
    node = client.get_node(args.nodeid)
63
    if args.path:
64
        node = node.get_child(args.path.split(","))
65
66
67
def uaread():
68
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
69
    add_common_args(parser)
70
    parser.add_argument("-a",
71
                        "--attribute",
72
                        dest="attribute",
73
                        type=int,
74
                        default=ua.AttributeIds.Value,
75
                        help="Set attribute to read")
76
    parser.add_argument("-t",
77
                        "--datatype",
78
                        dest="datatype",
79
                        default="python",
80
                        choices=['python', 'variant', 'datavalue'],
81
                        help="Data type to return")
82
83
    args = parser.parse_args()
84
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
85
        parser.print_usage()
86
        print("uaread: error: A NodeId or BrowsePath is required")
87
        sys.exit(1)
88
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
89
90
    client = Client(args.url, timeout=args.timeout)
91
    client.connect()
92
    try:
93
        node = client.get_node(args.nodeid)
94
        if args.path:
95
            node = node.get_child(args.path.split(","))
96
        attr = node.get_attribute(args.attribute)
97
        if args.datatype == "python":
98
            print(attr.Value.Value)
99
        elif args.datatype == "variant":
100
            print(attr.Value)
101
        else:
102
            print(attr)
103
    finally:
104
        client.disconnect()
105
    sys.exit(0)
106
    print(args)
107
108
109
def _args_to_array(val, array):
110
    if array == "guess":
111
        if "," in val:
112
            array = "true"
113
    if array == "true":
114
        val = val.split(",")
115
    return val
116
117
118
def _arg_to_bool(val):
119
    if val in ("true", "True"):
120
        return True
121
    else:
122
        return False
123
124
125
def _arg_to_variant(val, array, ptype, varianttype=None):
126
    val = _args_to_array(val, array)
127
    if isinstance(val, list):
128
        val = [ptype(i) for i in val]
129
    else:
130
        val = ptype(val)
131
    if varianttype:
132
        return ua.Variant(val, varianttype)
133
    else:
134
        return ua.Variant(val)
135
136
137
def _val_to_variant(val, args):
138
    array = args.array
139
    if args.datatype == "guess":
140
        if val in ("true", "True", "false", "False"):
141
            return _arg_to_variant(val, array, _arg_to_bool)
142
        # FIXME: guess bool value
143
        try:
144
            return _arg_to_variant(val, array, int)
145
        except ValueError:
146
            try:
147
                return _arg_to_variant(val, array, float)
148
            except ValueError:
149
                return _arg_to_variant(val, array, str)
150
    elif args.datatype == "bool":
151
        if val in ("1", "True", "true"):
152
            return ua.Variant(True, ua.VariantType.Boolean)
153
        else:
154
            return ua.Variant(False, ua.VariantType.Boolean)
155
    elif args.datatype == "sbyte":
156
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
157
    elif args.datatype == "byte":
158
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
159
    #elif args.datatype == "uint8":
160
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
161
    elif args.datatype == "uint16":
162
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
163
    elif args.datatype == "uint32":
164
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
165
    elif args.datatype == "uint64":
166
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
167
    #elif args.datatype == "int8":
168
        #return ua.Variant(int(val), ua.VariantType.Int8)
169
    elif args.datatype == "int16":
170
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
171
    elif args.datatype == "int32":
172
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
173
    elif args.datatype == "int64":
174
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
175
    elif args.datatype == "float":
176
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
177
    elif args.datatype == "double":
178
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
179
    elif args.datatype == "string":
180
        return _arg_to_variant(val, array, str, ua.VariantType.String)
181
    elif args.datatype == "datetime":
182
        raise NotImplementedError
183
    elif args.datatype == "Guid":
184
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
185
    elif args.datatype == "ByteString":
186
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
187
    elif args.datatype == "xml":
188
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
189
    elif args.datatype == "nodeid":
190
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
191
    elif args.datatype == "expandednodeid":
192
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
193
    elif args.datatype == "statuscode":
194
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
195
    elif args.datatype in ("qualifiedname", "browsename"):
196
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
197
    elif args.datatype == "LocalizedText":
198
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantTypeLocalizedText)
199
200
201
def uawrite():
202
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
203
    add_common_args(parser)
204
    parser.add_argument("-a",
205
                        "--attribute",
206
                        dest="attribute",
207
                        type=int,
208
                        default=ua.AttributeIds.Value,
209
                        help="Set attribute to read")
210
    parser.add_argument("-l",
211
                        "--list",
212
                        "--array",
213
                        dest="array",
214
                        default="guess",
215
                        choices=["guess", "true", "false"],
216
                        help="Value is an array")
217
    parser.add_argument("-t",
218
                        "--datatype",
219
                        dest="datatype",
220
                        default="guess",
221
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],  
222
                        help="Data type to return")
223
    parser.add_argument("value",
224
                        help="Value to be written",
225
                        metavar="VALUE")
226
    args = parser.parse_args()
227
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
228
        parser.print_usage()
229
        print("uaread: error: A NodeId or BrowsePath is required")
230
        sys.exit(1)
231
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
232
233
    client = Client(args.url, timeout=args.timeout)
234
    client.connect()
235
    try:
236
        node = client.get_node(args.nodeid)
237
        if args.path:
238
            node = node.get_child(args.path.split(","))
239
        val = _val_to_variant(args.value, args)
240
        node.set_attribute(args.attribute, ua.DataValue(val))
241
    finally:
242
        client.disconnect()
243
    sys.exit(0)
244
    print(args)
245
246
247
def uals():
248
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
249
    add_common_args(parser)
250
    parser.add_argument("-l",
251
                        dest="long_format",
252
                        const=3,
253
                        nargs="?",
254
                        type=int,
255
                        help="use a long listing format")
256
    parser.add_argument("-d",
257
                        "--depth",
258
                        default=1,
259
                        type=int,
260
                        help="Browse depth")
261
262
    args = parser.parse_args()
263
    if args.long_format is None:
264
        args.long_format = 1
265
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
266
267
    client = Client(args.url, timeout=args.timeout)
268
    client.connect()
269
    try:
270
        node = client.get_node(args.nodeid)
271
        if args.path:
272
            node = node.get_child(args.path.split(","))
273
        print("Browsing node {} at {}\n".format(node, args.url))
274
        if args.long_format == 0:
275
            _lsprint_0(node, args.depth - 1)
276
        elif args.long_format == 1:
277
            _lsprint_1(node, args.depth - 1)
278
        else:
279
            _lsprint_long(node, args.depth - 1)
280
    finally:
281
        client.disconnect()
282
    sys.exit(0)
283
    print(args)
284
285
286
def _lsprint_0(node, depth, indent=""):
287
    if not indent:
288
        print("{:30} {:25}".format("DisplayName", "NodeId"))
289
        print("")
290
    for desc in node.get_children_descriptions():
291
        print("{}{:30} {:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
292
        if depth:
293
            _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
294
295
296
def _lsprint_1(node, depth, indent=""):
297
    if not indent:
298
        print("{:30} {:25} {:25} {:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
299
        print("")
300
301
    for desc in node.get_children_descriptions():
302
        if desc.NodeClass == ua.NodeClass.Variable:
303
            val = Node(node.server, desc.NodeId).get_value()
304
            print("{}{:30} {!s:25} {!s:25}, {!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
305
        else:
306
            print("{}{:30} {!s:25} {!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
307
        if depth:
308
            _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
309
310
311
def _lsprint_long(pnode, depth, indent=""):
312
    if not indent:
313
        print("{:30} {:25} {:25} {:10} {:30} {:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
314
        print("")
315
    for node in pnode.get_children():
316
        attrs = node.get_attributes([ua.AttributeIds.DisplayName, 
317
                                     ua.AttributeIds.BrowseName,
318
                                     ua.AttributeIds.NodeClass,
319
                                     ua.AttributeIds.WriteMask,
320
                                     ua.AttributeIds.UserWriteMask,
321
                                     ua.AttributeIds.DataType,
322
                                     ua.AttributeIds.Value])
323
        name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
324
        update = attrs[-1].ServerTimestamp
325
        if nclass == ua.NodeClass.Variable:
326
            print("{}{:30} {:25} {:25} {:10} {!s:30} {!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
327
        else:
328
            print("{}{:30} {:25} {:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
329
        if depth:
330
            _lsprint_long(node, depth - 1, indent + "  ")
331
332
333
class SubHandler(object):
334
335
    def data_change(self, handle, node, val, attr):
336
        print("New data change event", handle, node, val, attr)
337
338
    def event(self, handle, event):
339
        print("New event", handle, event)
340
341
342
def uasubscribe():
343
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
344
    add_common_args(parser)
345
    parser.add_argument("-t",
346
                        "--eventtype",
347
                        dest="eventtype",
348
                        default="datachange",
349
                        choices=['datachange', 'event'],
350
                        help="Event type to subscribe to")
351
352
    args = parser.parse_args()
353
    if args.nodeid == "i=84" and args.path == "":
354
        parser.print_usage()
355
        print("uaread: error: The NodeId or BrowsePath of a variable is required")
356
        sys.exit(1)
357
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
358
359
    client = Client(args.url, timeout=args.timeout)
360
    client.connect()
361
    try:
362
        node = client.get_node(args.nodeid)
363
        if args.path:
364
            node = node.get_child(args.path.split(","))
365
        handler = SubHandler()
366
        sub = client.create_subscription(500, handler)
367
        if args.eventtype == "datachange":
368
            sub.subscribe_data_change(node)
369
        else:
370
            sub.subscribe_events(node)
371
        embed()
372
    finally:
373
        client.disconnect()
374
    sys.exit(0)
375
    print(args)
376
377
378
# converts numeric value to its enum name.
379
def enum_to_string(klass, value):
380
    if isinstance(value, Enum):
381
        return value.name
382
    # if value is not a subtype of Enum, try to find a constant
383
    # with this value in this class
384
    for k, v in vars(klass).items():
385
        if not k.startswith('__') and v == value:
386
            return k
387
    return 'Unknown {} ({})'.format(klass.__name__, value)
388
389
390
def application_to_strings(app):
391
    result = []
392
    result.append(('Application URI', app.ApplicationUri))
393
    optionals = [
394
        ('Product URI', app.ProductUri),
395
        ('Application Name', app.ApplicationName.to_string()),
396
        ('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
397
        ('Gateway Server URI', app.GatewayServerUri),
398
        ('Discovery Profile URI', app.DiscoveryProfileUri),
399
    ]
400
    for (n, v) in optionals:
401
        if v:
402
            result.append((n, v))
403
    for url in app.DiscoveryUrls:
404
        result.append(('Discovery URL', url))
405
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
406
407
408
def endpoint_to_strings(ep):
409
    result = [('Endpoint URL', ep.EndpointUrl)]
410
    result += application_to_strings(ep.Server)
411
    result += [
412
        ('Server Certificate', len(ep.ServerCertificate)),
413
        ('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
414
        ('Security Policy URI', ep.SecurityPolicyUri)]
415
    for tok in ep.UserIdentityTokens:
416
        result += [
417
            ('User policy', tok.PolicyId),
418
            ('  Token type', enum_to_string(ua.UserTokenType, tok.TokenType))]
419
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
420
            result += [
421
                ('  Issued Token type', tok.IssuedTokenType),
422
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
423
        if tok.SecurityPolicyUri:
424
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
425
    result += [
426
        ('Transport Profile URI', ep.TransportProfileUri),
427
        ('Security Level', ep.SecurityLevel)]
428
    return result
429
430
431
def uaclient():
432
    parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
433
    add_common_args(parser)
434
    parser.add_argument("-c",
435
                        "--certificate",
436
                        help="set client certificate")
437
    parser.add_argument("-k",
438
                        "--private_key",
439
                        help="set client private key")
440
    args = parser.parse_args()
441
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
442
443
    client = Client(args.url, timeout=args.timeout)
444
    client.connect()
445
    if args.certificate:
446
        client.load_certificate(args.certificate)
447
    if args.private_key:
448
        client.load_certificate(args.private_key)
449
    try:
450
        root = client.get_root_node()
451
        objects = client.get_objects_node()
452
        mynode = client.get_node(args.nodeid)
453
        if args.path:
454
            mynode = mynode.get_child(args.path.split(","))
455
        embed()
456
    finally:
457
        client.disconnect()
458
    sys.exit(0)
459
460
461
462
463
def uaserver():
464
    parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite, it is even possible to expose real data using this server")
465
    add_minimum_args(parser)
466
    parser.add_argument("-x",
467
                        "--xml",
468
                        help="Populate address space with nodes defined in XML")
469
    parser.add_argument("-p",
470
                        "--populate",
471
                        action="store_false",
472
                        help="populate address space with some sample nodes")
473
    args = parser.parse_args()
474
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
475
476
    server = Server()
477
    server.set_endpoint(args.url)
478
    server.set_server_name("FreeOpcUa Example Server")
479
    if args.xml:
480
        server.import_xml(args.xml)
481
    if args.populate:
482
        @uamethod
483
        def multiply(parent, x, y):
484
            print("multiply method call with parameters: ", x, y)
485
            return x * y
486
487
        uri = "http://examples.freeopcua.github.io"
488
        idx = server.register_namespace(uri)
489
        objects = server.get_objects_node()
490
        myobj = objects.add_object(idx, "MyObject")
491
        myvar = myobj.add_variable(idx, "MyVariable", 6.7)
492
        myvar.set_writable()    # Set MyVariable to be writable by clients
493
        myarrayvar = myobj.add_variable(idx, "MyVarArray", [6.7, 7.9])
494
        myarrayvar.set_writable()    # Set MyVariable to be writable by clients
495
        myprop = myobj.add_property(idx, "MyProperty", "I am a property")
496
        mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Int64], [ua.VariantType.Boolean])
497
498
    server.start()
499
    try:
500
        embed()
501
    finally:
502
        server.stop()
503
    sys.exit(0)
504
505
506
507
508
509
def uadiscover():
510
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
511
    add_minimum_args(parser)
512
    parser.add_argument("-n",
513
                        "--network",
514
                        action="store_true",
515
                        help="Also send a FindServersOnNetwork request to server")
516
    #parser.add_argument("-s",
517
                        #"--servers",
518
                        #action="store_false",
519
                        #help="send a FindServers request to server")
520
    #parser.add_argument("-e",
521
                        #"--endpoints",
522
                        #action="store_false",
523
                        #help="send a GetEndpoints request to server")
524
    args = parser.parse_args()
525
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
526
    
527
    if args.network:
528
        client = Client(args.url, timeout=args.timeout)
529
        print("Performing discovery at {}\n".format(args.url))
530
        for i, server in enumerate(client.find_all_servers_on_network(), start=1):
531
            print('Server {}:'.format(i))
532
            #for (n, v) in application_to_strings(server):
533
                #print('  {}: {}'.format(n, v))
534
            print('')
535
536
    client = Client(args.url, timeout=args.timeout)
537
    print("Performing discovery at {}\n".format(args.url))
538
    for i, server in enumerate(client.find_all_servers(), start=1):
539
        print('Server {}:'.format(i))
540
        for (n, v) in application_to_strings(server):
541
            print('  {}: {}'.format(n, v))
542
        print('')
543
544
    client = Client(args.url, timeout=args.timeout)
545
    for i, ep in enumerate(client.get_server_endpoints(), start=1):
546
        print('Endpoint {}:'.format(i))
547
        for (n, v) in endpoint_to_strings(ep):
548
            print('  {}: {}'.format(n, v))
549
        print('')
550
551
    sys.exit(0)
552
553
554
def print_history(o):
555
    if isinstance(o, ua.HistoryData):
556
        print("{:30} {:10} {}".format('Source timestamp', 'Status', 'Value'))
557
        for d in o.DataValues:
558
            print("{:30} {:10} {}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
559
560
561
def str_to_datetime(s):
562
    if not s:
563
        return datetime.utcnow()
564
    # try different datetime formats
565
    for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
566
        try:
567
            return datetime.strptime(s, fmt)
568
        except ValueError:
569
            pass
570
571
572
def uahistoryread():
573
    parser = argparse.ArgumentParser(description="Read history of a node")
574
    add_common_args(parser)
575
    parser.add_argument("--starttime",
576
                        default="",
577
                        help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
578
    parser.add_argument("--endtime",
579
                        default="",
580
                        help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
581
582
    args = parser.parse_args()
583
    if args.nodeid == "i=84" and args.path == "":
584
        parser.print_usage()
585
        print("uahistoryread: error: A NodeId or BrowsePath is required")
586
        sys.exit(1)
587
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
588
589
    client = Client(args.url, timeout=args.timeout)
590
    client.connect()
591
    try:
592
        node = client.get_node(args.nodeid)
593
        if args.path:
594
            node = node.get_child(args.path.split(","))
595
        starttime = str_to_datetime(args.starttime)
596
        endtime = str_to_datetime(args.endtime)
597
        print("Reading raw history of node {} at {}; start at {}, end at {}\n".format(node, args.url, starttime, endtime))
598
        print_history(node.read_raw_history(starttime, endtime))
599
    finally:
600
        client.disconnect()
601
    sys.exit(0)
602