Completed
Push — master ( 18f8bd...e443b4 )
by Olivier
04:56 queued 01:32
created

opcua.str_to_datetime()   A

Complexity

Conditions 4

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 9
rs 9.2
cc 4
1
import logging
2
import sys
3
import argparse
4
from opcua import ua, Client
5
import code
6
from enum import Enum
7
from datetime import datetime
8
9
10
def add_minimum_args(parser):
11
    parser.add_argument("-u",
12
                        "--url",
13
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
14
                        default='opc.tcp://localhost:4841',
15
                        metavar="URL")
16
    parser.add_argument("-v",
17
                        "--verbose",
18
                        dest="loglevel",
19
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
20
                        default='WARNING',
21
                        help="Set log level")
22
    parser.add_argument("--timeout",
23
                        dest="timeout",
24
                        type=int,
25
                        default=1,
26
                        help="Set socket timeout (NOT the diverse UA timeouts)")
27
28
29
def add_common_args(parser):
30
    add_minimum_args(parser)
31
    parser.add_argument("-n",
32
                        "--nodeid",
33
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
34
                        default='i=84',
35
                        metavar="NODE")
36
    parser.add_argument("-p",
37
                        "--path",
38
                        help="Comma separated browse path to the node starting at nodeid (for example: 3:Mybject,3:MyVariable)",
39
                        default='',
40
                        metavar="BROWSEPATH")
41
    parser.add_argument("-i",
42
                        "--namespace",
43
                        help="Default namespace",
44
                        type=int,
45
                        default=0,
46
                        metavar="NAMESPACE")
47
48
49
def get_node(client, args):
50
    node = client.get_node(args.nodeid)
51
    if args.path:
52
        node = node.get_child(args.path.split(","))
53
54
55
def uaread():
56
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
57
    add_common_args(parser)
58
    parser.add_argument("-a",
59
                        "--attribute",
60
                        dest="attribute",
61
                        type=int,
62
                        #default="VALUE",
63
                        #choices=['VALUE', 'NODEID', 'BROWSENAME', 'ERROR', 'CRITICAL'],
64
                        default=ua.AttributeIds.Value,
65
                        help="Set attribute to read")
66
    parser.add_argument("-t",
67
                        "--datatype",
68
                        dest="datatype",
69
                        default="python",
70
                        choices=['python', 'variant', 'datavalue'],
71
                        help="Data type to return")
72
73
    args = parser.parse_args()
74
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
75
        parser.print_usage()
76
        print("uaread: error: A NodeId or BrowsePath is required")
77
        sys.exit(1)
78
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
79
80
    client = Client(args.url, timeout=args.timeout)
81
    client.connect()
82
    try:
83
        node = client.get_node(args.nodeid)
84
        if args.path:
85
            node = node.get_child(args.path.split(","))
86
        attr = node.get_attribute(args.attribute)
87
        if args.datatype == "python":
88
            print(attr.Value.Value)
89
        elif args.datatype == "variant":
90
            print(attr.Value)
91
        else:
92
            print(attr)
93
    finally:
94
        client.disconnect()
95
    sys.exit(0)
96
    print(args)
97
98
99
def _args_to_array(val, array):
100
    if array == "guess":
101
        if "," in val:
102
            array = "true"
103
    if array == "true":
104
        val = val.split(",")
105
    return val
106
107
108
def _arg_to_bool(val):
109
    if val in ("true", "True"):
110
        return True
111
    else:
112
        return False
113
114
115
def _arg_to_variant(val, array, ptype, varianttype=None):
116
    val = _args_to_array(val, array)
117
    if isinstance(val, list):
118
        val = [ptype(i) for i in val]
119
    else:
120
        val = ptype(val)
121
    if varianttype:
122
        return ua.Variant(val, varianttype)
123
    else:
124
        return ua.Variant(val)
125
126
127
def _val_to_variant(val, args):
128
    array = args.array
129
    if args.datatype == "guess":
130
        if val in ("true", "True", "false", "False"):
131
            return _arg_to_variant(val, array, _arg_to_bool)
132
        # FIXME: guess bool value
133
        try:
134
            return _arg_to_variant(val, array, int)
135
        except ValueError:
136
            try:
137
                return _arg_to_variant(val, array, float)
138
            except ValueError:
139
                return _arg_to_variant(val, array, str)
140
    elif args.datatype == "bool":
141
        if val in ("1", "True", "true"):
142
            return ua.Variant(True, ua.VariantType.Boolean)
143
        else:
144
            return ua.Variant(False, ua.VariantType.Boolean)
145
    elif args.datatype == "sbyte":
146
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
147
    elif args.datatype == "byte":
148
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
149
    #elif args.datatype == "uint8":
150
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
151
    elif args.datatype == "uint16":
152
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
153
    elif args.datatype == "uint32":
154
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
155
    elif args.datatype == "uint64":
156
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
157
    #elif args.datatype == "int8":
158
        #return ua.Variant(int(val), ua.VariantType.Int8)
159
    elif args.datatype == "int16":
160
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
161
    elif args.datatype == "int32":
162
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
163
    elif args.datatype == "int64":
164
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
165
    elif args.datatype == "float":
166
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
167
    elif args.datatype == "double":
168
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
169
    elif args.datatype == "string":
170
        return _arg_to_variant(val, array, str, ua.VariantType.String)
171
    elif args.datatype == "datetime":
172
        raise NotImplementedError
173
    elif args.datatype == "Guid":
174
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
175
    elif args.datatype == "ByteString":
176
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
177
    elif args.datatype == "xml":
178
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
179
    elif args.datatype == "nodeid":
180
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
181
    elif args.datatype == "expandednodeid":
182
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
183
    elif args.datatype == "statuscode":
184
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
185
    elif args.datatype in ("qualifiedname", "browsename"):
186
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
187
    elif args.datatype == "LocalizedText":
188
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantTypeLocalizedText)
189
190
191
def uawrite():
192
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
193
    add_common_args(parser)
194
    parser.add_argument("-a",
195
                        "--attribute",
196
                        dest="attribute",
197
                        type=int,
198
                        #default="VALUE",
199
                        #choices=['VALUE', 'NODEID', 'BROWSENAME', 'ERROR', 'CRITICAL'],
200
                        default=ua.AttributeIds.Value,
201
                        help="Set attribute to read")
202
    parser.add_argument("-l",
203
                        "--list",
204
                        "--array",
205
                        dest="array",
206
                        default="guess",
207
                        choices=["guess", "true", "false"],
208
                        help="Value is an array")
209
    parser.add_argument("-t",
210
                        "--datatype",
211
                        dest="datatype",
212
                        default="guess",
213
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],  
214
                        help="Data type to return")
215
    parser.add_argument("value",
216
                        help="Value to be written",
217
                        metavar="VALUE")
218
    args = parser.parse_args()
219
    if args.nodeid == "i=84" and args.path == "" and args.attribute == ua.AttributeIds.Value:
220
        parser.print_usage()
221
        print("uaread: error: A NodeId or BrowsePath is required")
222
        sys.exit(1)
223
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
224
225
    client = Client(args.url, timeout=args.timeout)
226
    client.connect()
227
    try:
228
        node = client.get_node(args.nodeid)
229
        if args.path:
230
            node = node.get_child(args.path.split(","))
231
        val = _val_to_variant(args.value, args)
232
        node.set_attribute(args.attribute, ua.DataValue(val))
233
    finally:
234
        client.disconnect()
235
    sys.exit(0)
236
    print(args)
237
238
239
def uals():
240
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
241
    add_common_args(parser)
242
    #parser.add_argument("-l",
243
                        #dest="long_format",
244
                        #default=ua.AttributeIds.Value,
245
                        #help="use a long listing format")
246
    parser.add_argument("-d",
247
                        "--depth",
248
                        default=1,
249
                        type=int,
250
                        help="Browse depth")
251
252
    args = parser.parse_args()
253
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
254
255
    client = Client(args.url, timeout=args.timeout)
256
    client.connect()
257
    try:
258
        node = client.get_node(args.nodeid)
259
        if args.path:
260
            node = node.get_child(args.path.split(","))
261
        print("Browsing node {} at {}\n".format(node, args.url))
262
        _lsprint(client, node.nodeid, args.depth - 1)
263
264
    finally:
265
        client.disconnect()
266
    sys.exit(0)
267
    print(args)
268
269
270
def _lsprint(client, nodeid, depth, indent=""):
271
    indent += "    "
272
    pnode = client.get_node(nodeid)
273
    for desc in pnode.get_children_descriptions():
274
        print("{} {}, {}, {}".format(indent, desc.DisplayName.to_string(), desc.BrowseName.to_string(), desc.NodeId.to_string()))
275
        if depth:
276
            _lsprint(client, desc.NodeId, depth - 1, indent)
277
278
279
class SubHandler(object):
280
281
    def data_change(self, handle, node, val, attr):
282
        print("New data change event", handle, node, val, attr)
283
284
    def event(self, handle, event):
285
        print("New event", handle, event)
286
287
288
def uasubscribe():
289
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
290
    add_common_args(parser)
291
    parser.add_argument("-t",
292
                        "--eventtype",
293
                        dest="eventtype",
294
                        default="datachange",
295
                        choices=['datachange', 'event'],
296
                        help="Event type to subscribe to")
297
298
    args = parser.parse_args()
299
    if args.nodeid == "i=84" and args.path == "":
300
        parser.print_usage()
301
        print("uaread: error: The NodeId or BrowsePath of a variable is required")
302
        sys.exit(1)
303
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
304
305
    client = Client(args.url, timeout=args.timeout)
306
    client.connect()
307
    try:
308
        node = client.get_node(args.nodeid)
309
        if args.path:
310
            node = node.get_child(args.path.split(","))
311
        handler = SubHandler()
312
        sub = client.create_subscription(500, handler)
313
        if args.eventtype == "datachange":
314
            sub.subscribe_data_change(node)
315
        else:
316
            sub.subscribe_events(node)
317
        glbs = globals()
318
        glbs.update(locals())
319
        shell = code.InteractiveConsole(vars)
320
        shell.interact()
321
    finally:
322
        client.disconnect()
323
    sys.exit(0)
324
    print(args)
325
326
327
# converts numeric value to its enum name.
328
def enum_to_string(klass, value):
329
    if isinstance(value, Enum):
330
        return value.name
331
    # if value is not a subtype of Enum, try to find a constant
332
    # with this value in this class
333
    for k, v in vars(klass).items():
334
        if not k.startswith('__') and v == value:
335
            return k
336
    return 'Unknown {} ({})'.format(klass.__name__, value)
337
338
339
def application_to_strings(app):
340
    result = []
341
    result.append(('Application URI', app.ApplicationUri))
342
    optionals = [
343
        ('Product URI', app.ProductUri),
344
        ('Application Name', app.ApplicationName.to_string()),
345
        ('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
346
        ('Gateway Server URI', app.GatewayServerUri),
347
        ('Discovery Profile URI', app.DiscoveryProfileUri),
348
    ]
349
    for (n, v) in optionals:
350
        if v:
351
            result.append((n, v))
352
    for url in app.DiscoveryUrls:
353
        result.append(('Discovery URL', url))
354
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
355
356
357
def endpoint_to_strings(ep):
358
    result = [('Endpoint URL', ep.EndpointUrl)]
359
    result += application_to_strings(ep.Server)
360
    result += [
361
        ('Server Certificate', len(ep.ServerCertificate)),
362
        ('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
363
        ('Security Policy URI', ep.SecurityPolicyUri)]
364
    for tok in ep.UserIdentityTokens:
365
        result += [
366
            ('User policy', tok.PolicyId),
367
            ('  Token type', enum_to_string(ua.UserTokenType, tok.TokenType))]
368
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
369
            result += [
370
                ('  Issued Token type', tok.IssuedTokenType),
371
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
372
        if tok.SecurityPolicyUri:
373
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
374
    result += [
375
        ('Transport Profile URI', ep.TransportProfileUri),
376
        ('Security Level', ep.SecurityLevel)]
377
    return result
378
379
380
def uadiscover():
381
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
382
    add_minimum_args(parser)
383
    args = parser.parse_args()
384
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
385
386
    client = Client(args.url, timeout=args.timeout)
387
    print("Performing discovery at {}\n".format(args.url))
388
    for i, server in enumerate(client.find_all_servers(), start=1):
389
        print('Server {}:'.format(i))
390
        for (n, v) in application_to_strings(server):
391
            print('  {}: {}'.format(n, v))
392
        print('')
393
394
    client = Client(args.url, timeout=args.timeout)
395
    for i, ep in enumerate(client.get_server_endpoints(), start=1):
396
        print('Endpoint {}:'.format(i))
397
        for (n, v) in endpoint_to_strings(ep):
398
            print('  {}: {}'.format(n, v))
399
        print('')
400
401
    sys.exit(0)
402
403
def print_history(res):
404
    if res.TypeId.Identifier == ua.ObjectIds.HistoryData_Encoding_DefaultBinary:
405
        buf = ua.utils.Buffer(res.Body)
406
        print("{:30} {:10} {}".format('Source timestamp', 'Status', 'Value'))
407
        for d in ua.HistoryData.from_binary(buf).DataValues:
408
            print("{:30} {:10} {}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
409
410
def str_to_datetime(s):
411
    if not s:
412
        return datetime.utcnow()
413
    # try different datetime formats
414
    for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
415
        try:
416
            return datetime.strptime(s, fmt)
417
        except ValueError:
418
            pass
419
420
def uahistoryread():
421
    parser = argparse.ArgumentParser(description="Read history of a node")
422
    add_common_args(parser)
423
    parser.add_argument("--starttime",
424
                        default="",
425
                        help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
426
    parser.add_argument("--endtime",
427
                        default="",
428
                        help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
429
430
    args = parser.parse_args()
431
    if args.nodeid == "i=84" and args.path == "":
432
        parser.print_usage()
433
        print("uahistoryread: error: A NodeId or BrowsePath is required")
434
        sys.exit(1)
435
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
436
437
    client = Client(args.url, timeout=args.timeout)
438
    client.connect()
439
    try:
440
        node = client.get_node(args.nodeid)
441
        if args.path:
442
            node = node.get_child(args.path.split(","))
443
        starttime = str_to_datetime(args.starttime)
444
        endtime = str_to_datetime(args.endtime)
445
        print("Reading raw history of node {} at {}; start at {}, end at {}\n".format(node, args.url, starttime, endtime))
446
        print_history(node.read_raw_history(starttime, endtime))
447
    finally:
448
        client.disconnect()
449
    sys.exit(0)
450