Completed
Pull Request — master (#494)
by Olivier
05:47 queued 01:20
created

uageneratestructs()   B

Complexity

Conditions 2

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 2
dl 0
loc 25
ccs 0
cts 11
cp 0
crap 6
rs 8.8571
c 1
b 0
f 1
1
import os
2
import logging
3
import sys
4
import argparse
5
from datetime import datetime, timedelta
6
import math
7
import time
8
9
try:
10
    from IPython import embed
11
except ImportError:
12
    import code
13
14
    def embed():
15
        code.interact(local=dict(globals(), **locals()))
16
17
from opcua import ua
18
from opcua import Client
19
from opcua import Server
20
from opcua import Node
21
from opcua import uamethod
22
from opcua.common.structures_generator import StructGenerator
23
24
25
def add_minimum_args(parser):
26
    parser.add_argument("-u",
27
                        "--url",
28
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
29
                        default='opc.tcp://localhost:4840',
30
                        metavar="URL")
31
    parser.add_argument("-v",
32
                        "--verbose",
33
                        dest="loglevel",
34
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
35
                        default='WARNING',
36
                        help="Set log level")
37
    parser.add_argument("--timeout",
38
                        dest="timeout",
39
                        type=int,
40
                        default=1,
41
                        help="Set socket timeout (NOT the diverse UA timeouts)")
42
43
44
def add_common_args(parser, default_node='i=84'):
45
    add_minimum_args(parser)
46
    parser.add_argument("-n",
47
                        "--nodeid",
48
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
49
                        default=default_node,
50
                        metavar="NODE")
51
    parser.add_argument("-p",
52
                        "--path",
53
                        help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
54
                        default='',
55
                        metavar="BROWSEPATH")
56
    parser.add_argument("-i",
57
                        "--namespace",
58
                        help="Default namespace",
59
                        type=int,
60
                        default=0,
61
                        metavar="NAMESPACE")
62
    parser.add_argument("--security",
63
                        help="Security settings, for example: Basic256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
64
                        default='')
65
66
67
def _require_nodeid(parser, args):
68
    # check that a nodeid has been given explicitly, a bit hackish...
69
    if args.nodeid == "i=84" and args.path == "":
70
        parser.print_usage()
71
        print("{0}: error: A NodeId or BrowsePath is required".format(parser.prog))
72
        sys.exit(1)
73
74
75
def parse_args(parser, requirenodeid=False):
76
    args = parser.parse_args()
77
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
78
    if args.url and '://' not in args.url:
79
        logging.info("Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
80
        args.url = ua.OPC_TCP_SCHEME + '://' + args.url
81
    if requirenodeid:
82
        _require_nodeid(parser, args)
83
    return args
84
85
86
def get_node(client, args):
87
    node = client.get_node(args.nodeid)
88
    if args.path:
89
        path = args.path.split(",")
90
        if node.nodeid == ua.NodeId(84, 0) and path[0] == "0:Root":
91
            # let user specify root if not node given
92
            path = path[1:]
93
        node = node.get_child(path)
94
    return node
95 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
96
97
def uaread():
98
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
99
    add_common_args(parser)
100
    parser.add_argument("-a",
101
                        "--attribute",
102
                        dest="attribute",
103
                        type=int,
104
                        default=ua.AttributeIds.Value,
105
                        help="Set attribute to read")
106
    parser.add_argument("-t",
107
                        "--datatype",
108
                        dest="datatype",
109
                        default="python",
110
                        choices=['python', 'variant', 'datavalue'],
111
                        help="Data type to return")
112
113
    args = parse_args(parser, requirenodeid=True)
114
115
    client = Client(args.url, timeout=args.timeout)
116
    client.set_security_string(args.security)
117
    client.connect()
118
    try:
119
        node = get_node(client, args)
120
        attr = node.get_attribute(args.attribute)
121
        if args.datatype == "python":
122
            print(attr.Value.Value)
123
        elif args.datatype == "variant":
124
            print(attr.Value)
125
        else:
126
            print(attr)
127
    finally:
128
        client.disconnect()
129
    sys.exit(0)
130
    print(args)
131
132
133
def _args_to_array(val, array):
134
    if array == "guess":
135
        if "," in val:
136
            array = "true"
137
    if array == "true":
138
        val = val.split(",")
139
    return val
140
141
142
def _arg_to_bool(val):
143
    return val in ("true", "True")
144
145
146
def _arg_to_variant(val, array, ptype, varianttype=None):
147
    val = _args_to_array(val, array)
148
    if isinstance(val, list):
149
        val = [ptype(i) for i in val]
150
    else:
151
        val = ptype(val)
152
    if varianttype:
153
        return ua.Variant(val, varianttype)
154
    else:
155
        return ua.Variant(val)
156
157
158
def _val_to_variant(val, args):
159
    array = args.array
160
    if args.datatype == "guess":
161
        if val in ("true", "True", "false", "False"):
162
            return _arg_to_variant(val, array, _arg_to_bool)
163
        try:
164
            return _arg_to_variant(val, array, int)
165
        except ValueError:
166
            try:
167
                return _arg_to_variant(val, array, float)
168
            except ValueError:
169
                return _arg_to_variant(val, array, str)
170
    elif args.datatype == "bool":
171
        if val in ("1", "True", "true"):
172
            return ua.Variant(True, ua.VariantType.Boolean)
173
        else:
174
            return ua.Variant(False, ua.VariantType.Boolean)
175
    elif args.datatype == "sbyte":
176
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
177
    elif args.datatype == "byte":
178
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
179
    #elif args.datatype == "uint8":
180
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
181
    elif args.datatype == "uint16":
182
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
183
    elif args.datatype == "uint32":
184
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
185
    elif args.datatype == "uint64":
186
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
187
    #elif args.datatype == "int8":
188
        #return ua.Variant(int(val), ua.VariantType.Int8)
189
    elif args.datatype == "int16":
190
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
191
    elif args.datatype == "int32":
192
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
193
    elif args.datatype == "int64":
194
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
195
    elif args.datatype == "float":
196
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
197
    elif args.datatype == "double":
198
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
199
    elif args.datatype == "string":
200
        return _arg_to_variant(val, array, str, ua.VariantType.String)
201
    elif args.datatype == "datetime":
202
        raise NotImplementedError
203
    elif args.datatype == "Guid":
204
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
205
    elif args.datatype == "ByteString":
206
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
207
    elif args.datatype == "xml":
208
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
209
    elif args.datatype == "nodeid":
210
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
211
    elif args.datatype == "expandednodeid":
212
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
213
    elif args.datatype == "statuscode":
214
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
215
    elif args.datatype in ("qualifiedname", "browsename"):
216
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
217
    elif args.datatype == "LocalizedText":
218
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantType.LocalizedText)
219
220
221
def uawrite():
222
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
223
    add_common_args(parser)
224
    parser.add_argument("-a",
225
                        "--attribute",
226
                        dest="attribute",
227
                        type=int,
228
                        default=ua.AttributeIds.Value,
229
                        help="Set attribute to read")
230
    parser.add_argument("-l",
231
                        "--list",
232
                        "--array",
233
                        dest="array",
234
                        default="guess",
235
                        choices=["guess", "true", "false"],
236
                        help="Value is an array")
237
    parser.add_argument("-t",
238
                        "--datatype",
239
                        dest="datatype",
240
                        default="guess",
241
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
242
                        help="Data type to return")
243
    parser.add_argument("value",
244
                        help="Value to be written",
245
                        metavar="VALUE")
246
    args = parse_args(parser, requirenodeid=True)
247
248
    client = Client(args.url, timeout=args.timeout)
249
    client.set_security_string(args.security)
250
    client.connect()
251
    try:
252
        node = get_node(client, args)
253
        val = _val_to_variant(args.value, args)
254
        node.set_attribute(args.attribute, ua.DataValue(val))
255
    finally:
256
        client.disconnect()
257
    sys.exit(0)
258
    print(args)
259 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
261
def uals():
262
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
263
    add_common_args(parser)
264
    parser.add_argument("-l",
265
                        dest="long_format",
266
                        const=3,
267
                        nargs="?",
268
                        type=int,
269
                        help="use a long listing format")
270
    parser.add_argument("-d",
271
                        "--depth",
272
                        default=1,
273
                        type=int,
274
                        help="Browse depth")
275
276
    args = parse_args(parser)
277
    if args.long_format is None:
278
        args.long_format = 1
279
280
    client = Client(args.url, timeout=args.timeout)
281
    client.set_security_string(args.security)
282
    client.connect()
283
    try:
284
        node = get_node(client, args)
285
        print("Browsing node {0} at {1}\n".format(node, args.url))
286
        if args.long_format == 0:
287
            _lsprint_0(node, args.depth - 1)
288
        elif args.long_format == 1:
289
            _lsprint_1(node, args.depth - 1)
290
        else:
291
            _lsprint_long(node, args.depth - 1)
292
    finally:
293
        client.disconnect()
294
    sys.exit(0)
295
    print(args)
296
297
298
def _lsprint_0(node, depth, indent=""):
299
    if not indent:
300
        print("{0:30} {1:25}".format("DisplayName", "NodeId"))
301
        print("")
302
    for desc in node.get_children_descriptions():
303
        print("{0}{1:30} {2:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
304
        if depth:
305
            _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
306
307
308
def _lsprint_1(node, depth, indent=""):
309
    if not indent:
310
        print("{0:30} {1:25} {2:25} {3:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
311
        print("")
312
313
    for desc in node.get_children_descriptions():
314
        if desc.NodeClass == ua.NodeClass.Variable:
315
            val = Node(node.server, desc.NodeId).get_value()
316
            print("{0}{1:30} {2!s:25} {3!s:25}, {4!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
317
        else:
318
            print("{0}{1:30} {2!s:25} {3!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
319
        if depth:
320
            _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
321
322
323
def _lsprint_long(pnode, depth, indent=""):
324
    if not indent:
325
        print("{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
326
        print("")
327
    for node in pnode.get_children():
328
        attrs = node.get_attributes([ua.AttributeIds.DisplayName,
329
                                     ua.AttributeIds.BrowseName,
330
                                     ua.AttributeIds.NodeClass,
331
                                     ua.AttributeIds.WriteMask,
332
                                     ua.AttributeIds.UserWriteMask,
333
                                     ua.AttributeIds.DataType,
334
                                     ua.AttributeIds.Value])
335
        name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
336
        update = attrs[-1].ServerTimestamp
337
        if nclass == ua.NodeClass.Variable:
338
            print("{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
339
        else:
340
            print("{0}{1:30} {2:25} {3:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
341
        if depth:
342
            _lsprint_long(node, depth - 1, indent + "  ")
343
344
345
class SubHandler(object):
346
347
    def datachange_notification(self, node, val, data):
348
        print("New data change event", node, val, data)
349
350
    def event_notification(self, event):
351
        print("New event", event)
352
353
354
def uasubscribe():
355
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
356
    add_common_args(parser)
357
    parser.add_argument("-t",
358
                        "--eventtype",
359
                        dest="eventtype",
360
                        default="datachange",
361
                        choices=['datachange', 'event'],
362
                        help="Event type to subscribe to")
363
364
    args = parse_args(parser, requirenodeid=False)
365
    if args.eventtype == "datachange":
366
        _require_nodeid(parser, args)
367
    else:
368
        # FIXME: this is broken, someone may have written i=84 on purpose
369
        if args.nodeid == "i=84" and args.path == "":
370
            args.nodeid = "i=2253"
371
372
    client = Client(args.url, timeout=args.timeout)
373
    client.set_security_string(args.security)
374
    client.connect()
375
    try:
376
        node = get_node(client, args)
377
        handler = SubHandler()
378
        sub = client.create_subscription(500, handler)
379
        if args.eventtype == "datachange":
380
            sub.subscribe_data_change(node)
381
        else:
382
            sub.subscribe_events(node)
383
        print("Type Ctr-C to exit")
384
        while True:
385
            time.sleep(1)
386
    finally:
387
        client.disconnect()
388
    sys.exit(0)
389
    print(args)
390
391
392
def application_to_strings(app):
393
    result = []
394
    result.append(('Application URI', app.ApplicationUri))
395
    optionals = [
396
        ('Product URI', app.ProductUri),
397
        ('Application Name', app.ApplicationName.to_string()),
398
        ('Application Type', str(app.ApplicationType)),
399
        ('Gateway Server URI', app.GatewayServerUri),
400
        ('Discovery Profile URI', app.DiscoveryProfileUri),
401
    ]
402
    for (n, v) in optionals:
403
        if v:
404
            result.append((n, v))
405
    for url in app.DiscoveryUrls:
406
        result.append(('Discovery URL', url))
407
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
408
409
410
def cert_to_string(der):
411
    if not der:
412
        return '[no certificate]'
413
    try:
414
        from opcua.crypto import uacrypto
415
    except ImportError:
416
        return "{0} bytes".format(len(der))
417
    cert = uacrypto.x509_from_der(der)
418
    return uacrypto.x509_to_string(cert)
419
420
421
def endpoint_to_strings(ep):
422
    result = [('Endpoint URL', ep.EndpointUrl)]
423
    result += application_to_strings(ep.Server)
424
    result += [
425
        ('Server Certificate', cert_to_string(ep.ServerCertificate)),
426
        ('Security Mode', str(ep.SecurityMode)),
427
        ('Security Policy URI', ep.SecurityPolicyUri)]
428
    for tok in ep.UserIdentityTokens:
429
        result += [
430
            ('User policy', tok.PolicyId),
431
            ('  Token type', str(tok.TokenType))]
432
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
433
            result += [
434
                ('  Issued Token type', tok.IssuedTokenType),
435
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
436
        if tok.SecurityPolicyUri:
437
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
438
    result += [
439
        ('Transport Profile URI', ep.TransportProfileUri),
440
        ('Security Level', ep.SecurityLevel)]
441
    return result
442
443
444
def uaclient():
445
    parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
446
    add_common_args(parser)
447
    parser.add_argument("-c",
448
                        "--certificate",
449
                        help="set client certificate")
450
    parser.add_argument("-k",
451
                        "--private_key",
452
                        help="set client private key")
453
    args = parse_args(parser)
454
455
    client = Client(args.url, timeout=args.timeout)
456
    client.set_security_string(args.security)
457
    if args.certificate:
458
        client.load_client_certificate(args.certificate)
459
    if args.private_key:
460
        client.load_private_key(args.private_key)
461
    client.connect()
462
    try:
463
        root = client.get_root_node()
464
        objects = client.get_objects_node()
465
        mynode = get_node(client, args)
466
        embed()
467
    finally:
468
        client.disconnect()
469
    sys.exit(0)
470
471
472
def uaserver():
473
    parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
474
    # we setup a server, this is a bit different from other tool so we do not reuse common arguments
475
    parser.add_argument("-u",
476
                        "--url",
477
                        help="URL of OPC UA server, default is opc.tcp://0.0.0.0:4840",
478
                        default='opc.tcp://0.0.0.0:4840',
479
                        metavar="URL")
480
    parser.add_argument("-v",
481
                        "--verbose",
482
                        dest="loglevel",
483
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
484
                        default='WARNING',
485
                        help="Set log level")
486
    parser.add_argument("-x",
487
                        "--xml",
488
                        metavar="XML_FILE",
489
                        help="Populate address space with nodes defined in XML")
490
    parser.add_argument("-p",
491
                        "--populate",
492
                        action="store_true",
493
                        help="Populate address space with some sample nodes")
494
    parser.add_argument("-c",
495
                        "--disable-clock",
496
                        action="store_true",
497
                        help="Disable clock, to avoid seeing many write if debugging an application")
498
    parser.add_argument("-s",
499
                        "--shell",
500
                        action="store_true",
501
                        help="Start python shell instead of randomly changing node values")
502
    parser.add_argument("--certificate",
503
                        help="set server certificate")
504
    parser.add_argument("--private_key",
505
                        help="set server private key")
506
    args = parser.parse_args()
507
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
508
509
    server = Server()
510
    server.set_endpoint(args.url)
511
    if args.certificate:
512
        server.load_certificate(args.certificate)
513
    if args.private_key:
514
        server.load_private_key(args.private_key)
515
    server.disable_clock(args.disable_clock)
516
    server.set_server_name("FreeOpcUa Example Server")
517
    if args.xml:
518
        server.import_xml(args.xml)
519
    if args.populate:
520
        @uamethod
521
        def multiply(parent, x, y):
522
            print("multiply method call with parameters: ", x, y)
523
            return x * y
524
525
        uri = "http://examples.freeopcua.github.io"
526
        idx = server.register_namespace(uri)
527
        objects = server.get_objects_node()
528
        myobj = objects.add_object(idx, "MyObject")
529
        mywritablevar = myobj.add_variable(idx, "MyWritableVariable", 6.7)
530
        mywritablevar.set_writable()    # Set MyVariable to be writable by clients
531
        myvar = myobj.add_variable(idx, "MyVariable", 6.7)
532
        myarrayvar = myobj.add_variable(idx, "MyVarArray", [6.7, 7.9])
533
        myprop = myobj.add_property(idx, "MyProperty", "I am a property")
534
        mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
535
536
    server.start()
537
    try:
538
        if args.shell:
539
            embed()
540
        elif args.populate:
541
            count = 0
542
            while True:
543
                time.sleep(1)
544
                myvar.set_value(math.sin(count / 10))
545
                myarrayvar.set_value([math.sin(count / 10), math.sin(count / 100)])
546
                count += 1
547
        else:
548
            while True:
549
                time.sleep(1)
550
    finally:
551
        server.stop()
552
    sys.exit(0)
553
554
555
def uadiscover():
556
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
557
    add_minimum_args(parser)
558
    parser.add_argument("-n",
559
                        "--network",
560
                        action="store_true",
561
                        help="Also send a FindServersOnNetwork request to server")
562
    #parser.add_argument("-s",
563
                        #"--servers",
564
                        #action="store_false",
565
                        #help="send a FindServers request to server")
566
    #parser.add_argument("-e",
567
                        #"--endpoints",
568
                        #action="store_false",
569
                        #help="send a GetEndpoints request to server")
570
    args = parse_args(parser)
571
572
    client = Client(args.url, timeout=args.timeout)
573
574
    if args.network:
575
        print("Performing discovery at {0}\n".format(args.url))
576
        for i, server in enumerate(client.connect_and_find_servers_on_network(), start=1):
577
            print('Server {0}:'.format(i))
578
            #for (n, v) in application_to_strings(server):
579
                #print('  {}: {}'.format(n, v))
580
            print('')
581
582
    print("Performing discovery at {0}\n".format(args.url))
583
    for i, server in enumerate(client.connect_and_find_servers(), start=1):
584
        print('Server {0}:'.format(i))
585
        for (n, v) in application_to_strings(server):
586
            print('  {0}: {1}'.format(n, v))
587
        print('')
588
589
    for i, ep in enumerate(client.connect_and_get_server_endpoints(), start=1):
590
        print('Endpoint {0}:'.format(i))
591
        for (n, v) in endpoint_to_strings(ep):
592
            print('  {0}: {1}'.format(n, v))
593
        print('')
594
595
    sys.exit(0)
596
597
598
def print_history(o):
599
    if isinstance(o, ua.HistoryData):
600
        print("{0:30} {1:10} {2}".format('Source timestamp', 'Status', 'Value'))
601
        for d in o.DataValues:
602
            print("{0:30} {1:10} {2}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
603
604
605
def str_to_datetime(s, default=None):
606
    if not s:
607
        if default is not None:
608
            return default
609
        return datetime.utcnow()
610
    # FIXME: try different datetime formats
611
    for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
612
        try:
613
            return datetime.strptime(s, fmt)
614
        except ValueError:
615
            pass
616
617
618
def uahistoryread():
619
    parser = argparse.ArgumentParser(description="Read history of a node")
620
    add_common_args(parser)
621
    parser.add_argument("--starttime",
622
                        default=None,
623
                        help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time - one day")
624
    parser.add_argument("--endtime",
625
                        default=None,
626
                        help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
627
    parser.add_argument("-e",
628
                        "--events",
629
                        action="store_true",
630
                        help="Read event history instead of data change history")
631
    parser.add_argument("-l",
632
                        "--limit",
633
                        type=int,
634
                        default=10,
635
                        help="Maximum number of notfication to return")
636
637
    args = parse_args(parser, requirenodeid=True)
638
639
    client = Client(args.url, timeout=args.timeout)
640
    client.set_security_string(args.security)
641
    client.connect()
642
    try:
643
        node = get_node(client, args)
644
        starttime = str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
645
        endtime = str_to_datetime(args.endtime, datetime.utcnow())
646
        print("Reading raw history of node {0} at {1}; start at {2}, end at {3}\n".format(node, args.url, starttime, endtime))
647
        if args.events:
648
            evs = node.read_event_history(starttime, endtime, numvalues=args.limit)
649
            for ev in evs:
650
                print(ev)
651
        else:
652
            print_history(node.read_raw_history(starttime, endtime, numvalues=args.limit))
653
    finally:
654
        client.disconnect()
655
    sys.exit(0)
656
657
658
def uacall():
659
    parser = argparse.ArgumentParser(description="Call method of a node")
660
    add_common_args(parser)
661
    parser.add_argument("-m",
662
                        "--method",
663
                        dest="method",
664
                        type=int,
665
                        default=None,
666
                        help="Set method to call. If not given then (single) method of the selected node is used.")
667
    parser.add_argument("-l",
668
                        "--list",
669
                        "--array",
670
                        dest="array",
671
                        default="guess",
672
                        choices=["guess", "true", "false"],
673
                        help="Value is an array")
674
    parser.add_argument("-t",
675
                        "--datatype",
676
                        dest="datatype",
677
                        default="guess",
678
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
679
                        help="Data type to return")
680
    parser.add_argument("value",
681
                        help="Value to use for call to method, if any",
682
                        nargs="?",
683
                        metavar="VALUE")
684
685
    args = parse_args(parser, requirenodeid=True)
686
687
    client = Client(args.url, timeout=args.timeout)
688
    client.set_security_string(args.security)
689
    client.connect()
690
    try:
691
        node = get_node(client, args)
692
        # val must be a tuple in order to enable method calls without arguments
693
        if ( args.value is None ):
694
            val = () #empty tuple
695
        else:
696
            val = (_val_to_variant(args.value, args),) # tuple with one element
697
698
        # determine method to call: Either explicitly given or automatically select the method of the selected node.
699
        methods = node.get_methods()
700
        method_id = None
701
        #print( "methods=%s" % (methods) )
702
703
        if ( args.method is None ):
704
            if ( len( methods ) == 0 ):
705
                raise ValueError( "No methods in selected node and no method given" )
706
            elif ( len( methods ) == 1 ):
707
                method_id = methods[0]
708
            else:
709
                raise ValueError( "Selected node has {0:d} methods but no method given. Provide one of {1!s}".format(*(methods)) )
710
        else:
711
            for m in methods:
712
                if ( m.nodeid.Identifier == args.method ):
713
                    method_id = m.nodeid
714
                    break
715
716
        if ( method_id is None):
717
            # last resort:
718
            method_id = ua.NodeId( identifier=args.method )#, namespaceidx=? )#, nodeidtype=?): )
719
720
        #print( "method_id=%s\nval=%s" % (method_id,val) )
721
722
        result_variants = node.call_method( method_id, *val )
723
        print( "resulting result_variants={0!s}".format(result_variants) )
724
    finally:
725
        client.disconnect()
726
    sys.exit(0)
727
    print(args)
728
729
730
def uageneratestructs():
731
    parser = argparse.ArgumentParser(description="Generate a Python module from the xml structure definition (.bsd)")
732
    parser.add_argument("-i",
733
                        "--input",
734
                        dest="input_path",
735
                        required=True,
736
                        help="The xml file definning the custom OPC UA structures.",
737
                        )
738
    parser.add_argument("-o",
739
                        "--output",
740
                        dest="output_path",
741
                        required=True,
742
                        type=str,
743
                        default=None,
744
                        help="The python file to be generated.",
745
                        )
746
747
    args = parser.parse_args()
748
    if not os.path.isfile(args.output_path):
749
        parser.print_usage()
750
        return 1
751
752
    generator = StructGenerator()
753
    generator.make_model_from_file(args.input_path)
754
    generator.save_to_file(args.output_path, True)
755
756
757
758