Completed
Push — master ( 0141ac...5318bd )
by Olivier
05:05
created

uacall()   C

Complexity

Conditions 8

Size

Total Lines 70

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 72

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 8
c 1
b 0
f 0
dl 0
loc 70
ccs 0
cts 0
cp 0
crap 72
rs 5.1881

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import logging
2
import sys
3
import argparse
4
from datetime import datetime, timedelta
5
import math
6
import time
7
8
try:
9
    from IPython import embed
10
except ImportError:
11
    import code
12
13
    def embed():
14
        code.interact(local=dict(globals(), **locals()))
15
16
from opcua import ua
17
from opcua import Client
18
from opcua import Server
19
from opcua import Node
20
from opcua import uamethod
21
22
23
def add_minimum_args(parser):
24
    parser.add_argument("-u",
25
                        "--url",
26
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
27
                        default='opc.tcp://localhost:4840',
28
                        metavar="URL")
29
    parser.add_argument("-v",
30
                        "--verbose",
31
                        dest="loglevel",
32
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
33
                        default='WARNING',
34
                        help="Set log level")
35
    parser.add_argument("--timeout",
36
                        dest="timeout",
37
                        type=int,
38
                        default=1,
39
                        help="Set socket timeout (NOT the diverse UA timeouts)")
40
41
42
def add_common_args(parser, default_node='i=84'):
43
    add_minimum_args(parser)
44
    parser.add_argument("-n",
45
                        "--nodeid",
46
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
47
                        default=default_node,
48
                        metavar="NODE")
49
    parser.add_argument("-p",
50
                        "--path",
51
                        help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
52
                        default='',
53
                        metavar="BROWSEPATH")
54
    parser.add_argument("-i",
55
                        "--namespace",
56
                        help="Default namespace",
57
                        type=int,
58
                        default=0,
59
                        metavar="NAMESPACE")
60
    parser.add_argument("--security",
61
                        help="Security settings, for example: Basic256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
62
                        default='')
63
64
65
def _require_nodeid(parser, args):
66
    # check that a nodeid has been given explicitly, a bit hackish...
67
    if args.nodeid == "i=84" and args.path == "":
68
        parser.print_usage()
69
        print("{}: error: A NodeId or BrowsePath is required".format(parser.prog))
70
        sys.exit(1)
71
72
73
def parse_args(parser, requirenodeid=False):
74
    args = parser.parse_args()
75
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
76
    if args.url and '://' not in args.url:
77
        logging.info("Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
78
        args.url = ua.OPC_TCP_SCHEME + '://' + args.url
79
    if requirenodeid:
80
        _require_nodeid(parser, args)
81
    return args
82
83
84
def get_node(client, args):
85
    node = client.get_node(args.nodeid)
86
    if args.path:
87
        path = args.path.split(",")
88
        if node.nodeid == ua.NodeId(84, 0) and path[0] == "0:Root":
89
            # let user specify root if not node given
90
            path = path[1:]
91
        node = node.get_child(path)
92
    return node
93
94
95 View Code Duplication
def uaread():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
96
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
97
    add_common_args(parser)
98
    parser.add_argument("-a",
99
                        "--attribute",
100
                        dest="attribute",
101
                        type=int,
102
                        default=ua.AttributeIds.Value,
103
                        help="Set attribute to read")
104
    parser.add_argument("-t",
105
                        "--datatype",
106
                        dest="datatype",
107
                        default="python",
108
                        choices=['python', 'variant', 'datavalue'],
109
                        help="Data type to return")
110
111
    args = parse_args(parser, requirenodeid=True)
112
113
    client = Client(args.url, timeout=args.timeout)
114
    client.set_security_string(args.security)
115
    client.connect()
116
    try:
117
        node = get_node(client, args)
118
        attr = node.get_attribute(args.attribute)
119
        if args.datatype == "python":
120
            print(attr.Value.Value)
121
        elif args.datatype == "variant":
122
            print(attr.Value)
123
        else:
124
            print(attr)
125
    finally:
126
        client.disconnect()
127
    sys.exit(0)
128
    print(args)
129
130
131
def _args_to_array(val, array):
132
    if array == "guess":
133
        if "," in val:
134
            array = "true"
135
    if array == "true":
136
        val = val.split(",")
137
    return val
138
139
140
def _arg_to_bool(val):
141
    return val in ("true", "True")
142
143
144
def _arg_to_variant(val, array, ptype, varianttype=None):
145
    val = _args_to_array(val, array)
146
    if isinstance(val, list):
147
        val = [ptype(i) for i in val]
148
    else:
149
        val = ptype(val)
150
    if varianttype:
151
        return ua.Variant(val, varianttype)
152
    else:
153
        return ua.Variant(val)
154
155
156
def _val_to_variant(val, args):
157
    array = args.array
158
    if args.datatype == "guess":
159
        if val in ("true", "True", "false", "False"):
160
            return _arg_to_variant(val, array, _arg_to_bool)
161
        try:
162
            return _arg_to_variant(val, array, int)
163
        except ValueError:
164
            try:
165
                return _arg_to_variant(val, array, float)
166
            except ValueError:
167
                return _arg_to_variant(val, array, str)
168
    elif args.datatype == "bool":
169
        if val in ("1", "True", "true"):
170
            return ua.Variant(True, ua.VariantType.Boolean)
171
        else:
172
            return ua.Variant(False, ua.VariantType.Boolean)
173
    elif args.datatype == "sbyte":
174
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
175
    elif args.datatype == "byte":
176
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
177
    #elif args.datatype == "uint8":
178
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
179
    elif args.datatype == "uint16":
180
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
181
    elif args.datatype == "uint32":
182
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
183
    elif args.datatype == "uint64":
184
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
185
    #elif args.datatype == "int8":
186
        #return ua.Variant(int(val), ua.VariantType.Int8)
187
    elif args.datatype == "int16":
188
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
189
    elif args.datatype == "int32":
190
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
191
    elif args.datatype == "int64":
192
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
193
    elif args.datatype == "float":
194
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
195
    elif args.datatype == "double":
196
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
197
    elif args.datatype == "string":
198
        return _arg_to_variant(val, array, str, ua.VariantType.String)
199
    elif args.datatype == "datetime":
200
        raise NotImplementedError
201
    elif args.datatype == "Guid":
202
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
203
    elif args.datatype == "ByteString":
204
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
205
    elif args.datatype == "xml":
206
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
207
    elif args.datatype == "nodeid":
208
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
209
    elif args.datatype == "expandednodeid":
210
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
211
    elif args.datatype == "statuscode":
212
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
213
    elif args.datatype in ("qualifiedname", "browsename"):
214
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
215
    elif args.datatype == "LocalizedText":
216
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantType.LocalizedText)
217
218
219
def uawrite():
220
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
221
    add_common_args(parser)
222
    parser.add_argument("-a",
223
                        "--attribute",
224
                        dest="attribute",
225
                        type=int,
226
                        default=ua.AttributeIds.Value,
227
                        help="Set attribute to read")
228
    parser.add_argument("-l",
229
                        "--list",
230
                        "--array",
231
                        dest="array",
232
                        default="guess",
233
                        choices=["guess", "true", "false"],
234
                        help="Value is an array")
235
    parser.add_argument("-t",
236
                        "--datatype",
237
                        dest="datatype",
238
                        default="guess",
239
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
240
                        help="Data type to return")
241
    parser.add_argument("value",
242
                        help="Value to be written",
243
                        metavar="VALUE")
244
    args = parse_args(parser, requirenodeid=True)
245
246
    client = Client(args.url, timeout=args.timeout)
247
    client.set_security_string(args.security)
248
    client.connect()
249
    try:
250
        node = get_node(client, args)
251
        val = _val_to_variant(args.value, args)
252
        node.set_attribute(args.attribute, ua.DataValue(val))
253
    finally:
254
        client.disconnect()
255
    sys.exit(0)
256
    print(args)
257
258
259 View Code Duplication
def uals():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
261
    add_common_args(parser)
262
    parser.add_argument("-l",
263
                        dest="long_format",
264
                        const=3,
265
                        nargs="?",
266
                        type=int,
267
                        help="use a long listing format")
268
    parser.add_argument("-d",
269
                        "--depth",
270
                        default=1,
271
                        type=int,
272
                        help="Browse depth")
273
274
    args = parse_args(parser)
275
    if args.long_format is None:
276
        args.long_format = 1
277
278
    client = Client(args.url, timeout=args.timeout)
279
    client.set_security_string(args.security)
280
    client.connect()
281
    try:
282
        node = get_node(client, args)
283
        print("Browsing node {} at {}\n".format(node, args.url))
284
        if args.long_format == 0:
285
            _lsprint_0(node, args.depth - 1)
286
        elif args.long_format == 1:
287
            _lsprint_1(node, args.depth - 1)
288
        else:
289
            _lsprint_long(node, args.depth - 1)
290
    finally:
291
        client.disconnect()
292
    sys.exit(0)
293
    print(args)
294
295
296
def _lsprint_0(node, depth, indent=""):
297
    if not indent:
298
        print("{:30} {:25}".format("DisplayName", "NodeId"))
299
        print("")
300
    for desc in node.get_children_descriptions():
301
        print("{}{:30} {:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
302
        if depth:
303
            _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
304
305
306
def _lsprint_1(node, depth, indent=""):
307
    if not indent:
308
        print("{:30} {:25} {:25} {:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
309
        print("")
310
311
    for desc in node.get_children_descriptions():
312
        if desc.NodeClass == ua.NodeClass.Variable:
313
            val = Node(node.server, desc.NodeId).get_value()
314
            print("{}{:30} {!s:25} {!s:25}, {!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
315
        else:
316
            print("{}{:30} {!s:25} {!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
317
        if depth:
318
            _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
319
320
321
def _lsprint_long(pnode, depth, indent=""):
322
    if not indent:
323
        print("{:30} {:25} {:25} {:10} {:30} {:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
324
        print("")
325
    for node in pnode.get_children():
326
        attrs = node.get_attributes([ua.AttributeIds.DisplayName,
327
                                     ua.AttributeIds.BrowseName,
328
                                     ua.AttributeIds.NodeClass,
329
                                     ua.AttributeIds.WriteMask,
330
                                     ua.AttributeIds.UserWriteMask,
331
                                     ua.AttributeIds.DataType,
332
                                     ua.AttributeIds.Value])
333
        name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
334
        update = attrs[-1].ServerTimestamp
335
        if nclass == ua.NodeClass.Variable:
336
            print("{}{:30} {:25} {:25} {:10} {!s:30} {!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
337
        else:
338
            print("{}{:30} {:25} {:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
339
        if depth:
340
            _lsprint_long(node, depth - 1, indent + "  ")
341
342
343
class SubHandler(object):
344
345
    def datachange_notification(self, node, val, data):
346
        print("New data change event", node, val, data)
347
348
    def event_notification(self, event):
349
        print("New event", event)
350
351
352
def uasubscribe():
353
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
354
    add_common_args(parser)
355
    parser.add_argument("-t",
356
                        "--eventtype",
357
                        dest="eventtype",
358
                        default="datachange",
359
                        choices=['datachange', 'event'],
360
                        help="Event type to subscribe to")
361
362
    args = parse_args(parser, requirenodeid=False)
363
    if args.eventtype == "datachange":
364
        _require_nodeid(parser, args)
365
    else:
366
        # FIXME: this is broken, someone may have written i=84 on purpose
367
        if args.nodeid == "i=84" and args.path == "":
368
            args.nodeid = "i=2253"
369
370
    client = Client(args.url, timeout=args.timeout)
371
    client.set_security_string(args.security)
372
    client.connect()
373
    try:
374
        node = get_node(client, args)
375
        handler = SubHandler()
376
        sub = client.create_subscription(500, handler)
377
        if args.eventtype == "datachange":
378
            sub.subscribe_data_change(node)
379
        else:
380
            sub.subscribe_events(node)
381
        print("Type Ctr-C to exit")
382
        while True:
383
            time.sleep(1)
384
    finally:
385
        client.disconnect()
386
    sys.exit(0)
387
    print(args)
388
389
390
def application_to_strings(app):
391
    result = []
392
    result.append(('Application URI', app.ApplicationUri))
393
    optionals = [
394
        ('Product URI', app.ProductUri),
395
        ('Application Name', app.ApplicationName.to_string()),
396
        ('Application Type', str(app.ApplicationType)),
397
        ('Gateway Server URI', app.GatewayServerUri),
398
        ('Discovery Profile URI', app.DiscoveryProfileUri),
399
    ]
400
    for (n, v) in optionals:
401
        if v:
402
            result.append((n, v))
403
    for url in app.DiscoveryUrls:
404
        result.append(('Discovery URL', url))
405
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
406
407
408
def cert_to_string(der):
409
    if not der:
410
        return '[no certificate]'
411
    try:
412
        from opcua.crypto import uacrypto
413
    except ImportError:
414
        return "{} bytes".format(len(der))
415
    cert = uacrypto.x509_from_der(der)
416
    return uacrypto.x509_to_string(cert)
417
418
419
def endpoint_to_strings(ep):
420
    result = [('Endpoint URL', ep.EndpointUrl)]
421
    result += application_to_strings(ep.Server)
422
    result += [
423
        ('Server Certificate', cert_to_string(ep.ServerCertificate)),
424
        ('Security Mode', str(ep.SecurityMode)),
425
        ('Security Policy URI', ep.SecurityPolicyUri)]
426
    for tok in ep.UserIdentityTokens:
427
        result += [
428
            ('User policy', tok.PolicyId),
429
            ('  Token type', str(tok.TokenType))]
430
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
431
            result += [
432
                ('  Issued Token type', tok.IssuedTokenType),
433
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
434
        if tok.SecurityPolicyUri:
435
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
436
    result += [
437
        ('Transport Profile URI', ep.TransportProfileUri),
438
        ('Security Level', ep.SecurityLevel)]
439
    return result
440
441
442
def uaclient():
443
    parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
444
    add_common_args(parser)
445
    parser.add_argument("-c",
446
                        "--certificate",
447
                        help="set client certificate")
448
    parser.add_argument("-k",
449
                        "--private_key",
450
                        help="set client private key")
451
    args = parse_args(parser)
452
453
    client = Client(args.url, timeout=args.timeout)
454
    client.set_security_string(args.security)
455
    if args.certificate:
456
        client.load_client_certificate(args.certificate)
457
    if args.private_key:
458
        client.load_private_key(args.private_key)
459
    client.connect()
460
    try:
461
        root = client.get_root_node()
462
        objects = client.get_objects_node()
463
        mynode = get_node(client, args)
464
        embed()
465
    finally:
466
        client.disconnect()
467
    sys.exit(0)
468
469
470
def uaserver():
471
    parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
472
    # we setup a server, this is a bit different from other tool so we do not reuse common arguments
473
    parser.add_argument("-u",
474
                        "--url",
475
                        help="URL of OPC UA server, default is opc.tcp://0.0.0.0:4840",
476
                        default='opc.tcp://0.0.0.0:4840',
477
                        metavar="URL")
478
    parser.add_argument("-v",
479
                        "--verbose",
480
                        dest="loglevel",
481
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
482
                        default='WARNING',
483
                        help="Set log level")
484
    parser.add_argument("-x",
485
                        "--xml",
486
                        metavar="XML_FILE",
487
                        help="Populate address space with nodes defined in XML")
488
    parser.add_argument("-p",
489
                        "--populate",
490
                        action="store_true",
491
                        help="Populate address space with some sample nodes")
492
    parser.add_argument("-c",
493
                        "--disable-clock",
494
                        action="store_true",
495
                        help="Disable clock, to avoid seeing many write if debugging an application")
496
    parser.add_argument("-s",
497
                        "--shell",
498
                        action="store_true",
499
                        help="Start python shell instead of randomly changing node values")
500
    parser.add_argument("--certificate",
501
                        help="set server certificate")
502
    parser.add_argument("--private_key",
503
                        help="set server private key")
504
    args = parser.parse_args()
505
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
506
507
    server = Server()
508
    server.set_endpoint(args.url)
509
    if args.certificate:
510
        server.load_certificate(args.certificate)
511
    if args.private_key:
512
        server.load_private_key(args.private_key)
513
    server.disable_clock(args.disable_clock)
514
    server.set_server_name("FreeOpcUa Example Server")
515
    if args.xml:
516
        server.import_xml(args.xml)
517
    if args.populate:
518
        @uamethod
519
        def multiply(parent, x, y):
520
            print("multiply method call with parameters: ", x, y)
521
            return x * y
522
523
        uri = "http://examples.freeopcua.github.io"
524
        idx = server.register_namespace(uri)
525
        objects = server.get_objects_node()
526
        myobj = objects.add_object(idx, "MyObject")
527
        mywritablevar = myobj.add_variable(idx, "MyWritableVariable", 6.7)
528
        mywritablevar.set_writable()    # Set MyVariable to be writable by clients
529
        myvar = myobj.add_variable(idx, "MyVariable", 6.7)
530
        myarrayvar = myobj.add_variable(idx, "MyVarArray", [6.7, 7.9])
531
        myprop = myobj.add_property(idx, "MyProperty", "I am a property")
532
        mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
533
534
    server.start()
535
    try:
536
        if args.shell:
537
            embed()
538
        elif args.populate:
539
            count = 0
540
            while True:
541
                time.sleep(1)
542
                myvar.set_value(math.sin(count / 10))
543
                myarrayvar.set_value([math.sin(count / 10), math.sin(count / 100)])
544
                count += 1
545
        else:
546
            while True:
547
                time.sleep(1)
548
    finally:
549
        server.stop()
550
    sys.exit(0)
551
552
553
def uadiscover():
554
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
555
    add_minimum_args(parser)
556
    parser.add_argument("-n",
557
                        "--network",
558
                        action="store_true",
559
                        help="Also send a FindServersOnNetwork request to server")
560
    #parser.add_argument("-s",
561
                        #"--servers",
562
                        #action="store_false",
563
                        #help="send a FindServers request to server")
564
    #parser.add_argument("-e",
565
                        #"--endpoints",
566
                        #action="store_false",
567
                        #help="send a GetEndpoints request to server")
568
    args = parse_args(parser)
569
570
    client = Client(args.url, timeout=args.timeout)
571
572
    if args.network:
573
        print("Performing discovery at {}\n".format(args.url))
574
        for i, server in enumerate(client.connect_and_find_servers_on_network(), start=1):
575
            print('Server {}:'.format(i))
576
            #for (n, v) in application_to_strings(server):
577
                #print('  {}: {}'.format(n, v))
578
            print('')
579
580
    print("Performing discovery at {}\n".format(args.url))
581
    for i, server in enumerate(client.connect_and_find_servers(), start=1):
582
        print('Server {}:'.format(i))
583
        for (n, v) in application_to_strings(server):
584
            print('  {}: {}'.format(n, v))
585
        print('')
586
587
    for i, ep in enumerate(client.connect_and_get_server_endpoints(), start=1):
588
        print('Endpoint {}:'.format(i))
589
        for (n, v) in endpoint_to_strings(ep):
590
            print('  {}: {}'.format(n, v))
591
        print('')
592
593
    sys.exit(0)
594
595
596
def print_history(o):
597
    if isinstance(o, ua.HistoryData):
598
        print("{:30} {:10} {}".format('Source timestamp', 'Status', 'Value'))
599
        for d in o.DataValues:
600
            print("{:30} {:10} {}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
601
602
603
def str_to_datetime(s, default=None):
604
    if not s:
605
        if default is not None:
606
            return default
607
        return datetime.utcnow()
608
    # FIXME: try different datetime formats
609
    for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
610
        try:
611
            return datetime.strptime(s, fmt)
612
        except ValueError:
613
            pass
614
615
616
def uahistoryread():
617
    parser = argparse.ArgumentParser(description="Read history of a node")
618
    add_common_args(parser)
619
    parser.add_argument("--starttime",
620
                        default=None,
621
                        help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time - one day")
622
    parser.add_argument("--endtime",
623
                        default=None,
624
                        help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
625
    parser.add_argument("-e",
626
                        "--events",
627
                        action="store_true",
628
                        help="Read event history instead of data change history")
629
    parser.add_argument("-l",
630
                        "--limit",
631
                        type=int,
632
                        default=10,
633
                        help="Maximum number of notfication to return")
634
635
    args = parse_args(parser, requirenodeid=True)
636
637
    client = Client(args.url, timeout=args.timeout)
638
    client.set_security_string(args.security)
639
    client.connect()
640
    try:
641
        node = get_node(client, args)
642
        starttime = str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
643
        endtime = str_to_datetime(args.endtime, datetime.utcnow())
644
        print("Reading raw history of node {} at {}; start at {}, end at {}\n".format(node, args.url, starttime, endtime))
645
        if args.events:
646
            evs = node.read_event_history(starttime, endtime, numvalues=args.limit)
647
            for ev in evs:
648
                print(ev)
649
        else:
650
            print_history(node.read_raw_history(starttime, endtime, numvalues=args.limit))
651
    finally:
652
        client.disconnect()
653
    sys.exit(0)
654
655
656
def uacall():
657
    parser = argparse.ArgumentParser(description="Call method of a node")
658
    add_common_args(parser)
659
    parser.add_argument("-m",
660
                        "--method",
661
                        dest="method",
662
                        type=int,
663
                        default=None,
664
                        help="Set method to call. If not given then (single) method of the selected node is used.")
665
    parser.add_argument("-l",
666
                        "--list",
667
                        "--array",
668
                        dest="array",
669
                        default="guess",
670
                        choices=["guess", "true", "false"],
671
                        help="Value is an array")
672
    parser.add_argument("-t",
673
                        "--datatype",
674
                        dest="datatype",
675
                        default="guess",
676
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
677
                        help="Data type to return")
678
    parser.add_argument("value",
679
                        help="Value to use for call to method, if any",
680
                        nargs="?",
681
                        metavar="VALUE")
682
683
    args = parse_args(parser, requirenodeid=True)
684
685
    client = Client(args.url, timeout=args.timeout)
686
    client.set_security_string(args.security)
687
    client.connect()
688
    try:
689
        node = get_node(client, args)
690
        # val must be a tuple in order to enable method calls without arguments
691
        if ( args.value is None ):
692
            val = () #empty tuple
693
        else:
694
            val = (_val_to_variant(args.value, args),) # tuple with one element
695
696
        # determine method to call: Either explicitly given or automatically select the method of the selected node.
697
        methods = node.get_methods()
698
        method_id = None
699
        #print( "methods=%s" % (methods) )
700
701
        if ( args.method is None ):
702
            if ( len( methods ) == 0 ):
703
                raise ValueError( "No methods in selected node and no method given" )
704
            elif ( len( methods ) == 1 ):
705
                method_id = methods[0]
706
            else:
707
                raise ValueError( "Selected node has %d methods but no method given. Provide one of %s" % (methods) )
708
        else:
709
            for m in methods:
710
                if ( m.nodeid.Identifier == args.method ):
711
                    method_id = m.nodeid
712
                    break
713
714
        if ( method_id is None):
715
            # last resort:
716
            method_id = ua.NodeId( identifier=args.method )#, namespaceidx=? )#, nodeidtype=?): )
717
718
        #print( "method_id=%s\nval=%s" % (method_id,val) )
719
720
        result_variants = node.call_method( method_id, *val )
721
        print( "resulting result_variants=%s" % result_variants )
722
    finally:
723
        client.disconnect()
724
    sys.exit(0)
725
    print(args)
726