Passed
Pull Request — master (#120)
by Olivier
02:58
created

asyncua.tools   F

Complexity

Total Complexity 139

Size/Duplication

Total Lines 790
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 638
dl 0
loc 790
rs 1.962
c 0
b 0
f 0
wmc 139

38 Functions

Rating   Name   Duplication   Size   Complexity  
A embed() 0 2 1
A add_common_args() 0 26 1
A _require_nodeid() 0 6 3
A run() 0 2 1
A add_minimum_args() 0 17 1
A _arg_to_bool() 0 2 1
A _configure_client_with_args() 0 6 3
A uasubscribe() 0 2 1
A uacall() 0 2 1
A _uals() 0 33 4
A get_node() 0 9 4
A print_history() 0 4 2
F _val_to_variant() 0 61 27
B _uadiscover() 0 41 7
A _args_to_array() 0 7 4
A uadiscover() 0 2 1
A uals() 0 2 1
A parse_args() 0 10 4
B _lsprint_1() 0 16 6
B _uasubscribe() 0 34 6
A _lsprint_0() 0 8 4
C uaserver() 0 83 9
B _lsprint_long() 0 20 5
A cert_to_string() 0 9 3
B endpoint_to_strings() 0 21 5
A str_to_datetime() 0 11 5
A application_to_strings() 0 16 4
A _arg_to_variant() 0 10 3
A uaclient() 0 27 3
A uageneratestructs() 0 2 1
B _uacall() 0 54 4
A _uageneratestructs() 0 22 1
A uaread_value() 0 2 1
A _uawrite_value() 0 36 1
A uahistoryread_value() 0 2 1
A _uahistoryread_value() 0 38 3
A _uaread_value() 0 33 3
A uawrite_value() 0 2 1

2 Methods

Rating   Name   Duplication   Size   Complexity  
A SubHandler.datachange_notification() 0 2 1
A SubHandler.event_notification() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like asyncua.tools often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import asyncio
0 ignored issues
show
Coding Style introduced by
This module should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
2
import logging
3
import sys
4
import argparse
5
from datetime import datetime, timedelta
6
import math
7
import time
8
9
try:
10
    from IPython import embed
11
except ImportError:
12
    import code
13
14
    def embed():
15
        code.interact(local=dict(globals(), **locals()))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable code does not seem to be defined.
Loading history...
16
17
from asyncua import ua
18
from asyncua import Client
19
from asyncua import Node, uamethod
20
from asyncua import sync
21
from asyncua.ua.uaerrors import UaStatusCodeError
22
23
24
if sys.version_info.major < 3:
25
    raise ValueError("This is a python 3 application")
26
if sys.version_info.minor >= 7:
27
    def run(coro):
28
        return asyncio.run(coro)
29
else:
30
    def run(coro):
31
        loop = asyncio.get_event_loop()
32
        return loop.run_until_complete(coro)
33
34
35
def add_minimum_args(parser):
36
    parser.add_argument("-u",
37
                        "--url",
38
                        help="URL of OPC UA server (for example: opc.tcp://example.org:4840)",
39
                        default='opc.tcp://localhost:4840',
40
                        metavar="URL")
41
    parser.add_argument("-v",
42
                        "--verbose",
43
                        dest="loglevel",
44
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
45
                        default='WARNING',
46
                        help="Set log level")
47
    parser.add_argument("--timeout",
48
                        dest="timeout",
49
                        type=int,
50
                        default=1,
51
                        help="Set socket timeout (NOT the diverse UA timeouts)")
52
53
54
def add_common_args(parser, default_node='i=84', require_node=False):
55
    add_minimum_args(parser)
56
    parser.add_argument("-n",
57
                        "--nodeid",
58
                        help="Fully-qualified node ID (for example: i=85). Default: root node",
59
                        default=default_node,
60
                        required=require_node,
61
                        metavar="NODE")
62
    parser.add_argument("-p",
63
                        "--path",
64
                        help="Comma separated browse path to the node starting at NODE (for example: 3:Mybject,3:MyVariable)",
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (126/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
65
                        default='',
66
                        metavar="BROWSEPATH")
67
    parser.add_argument("-i",
68
                        "--namespace",
69
                        help="Default namespace",
70
                        type=int,
71
                        default=0,
72
                        metavar="NAMESPACE")
73
    parser.add_argument("--security",
74
                        help="Security settings, for example: Basic256Sha256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (142/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
75
                        default='')
76
    parser.add_argument("--user",
77
                        help="User name for authentication. Overrides the user name given in the URL.")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (103/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
78
    parser.add_argument("--password",
79
                        help="Password name for authentication. Overrides the password given in the URL.")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (106/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
80
81
82
def _require_nodeid(parser, args):
83
    # check that a nodeid has been given explicitly, a bit hackish...
84
    if args.nodeid == "i=84" and args.path == "":
85
        parser.print_usage()
86
        print("{0}: error: A NodeId or BrowsePath is required".format(parser.prog))
87
        sys.exit(1)
88
89
90
def parse_args(parser, requirenodeid=False):
91
    args = parser.parse_args()
92
    #logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
93
    logging.basicConfig(level=getattr(logging, args.loglevel))
94
    if args.url and '://' not in args.url:
95
        logging.info("Adding default scheme %s to URL %s", ua.OPC_TCP_SCHEME, args.url)
96
        args.url = ua.OPC_TCP_SCHEME + '://' + args.url
97
    if requirenodeid:
98
        _require_nodeid(parser, args)
99
    return args
100
101
102
async def get_node(client, args):
103
    node = client.get_node(args.nodeid)
104
    if args.path:
105
        path = args.path.split(",")
106
        if node.nodeid == ua.NodeId(84, 0) and path[0] == "0:Root":
107
            # let user specify root if not node given
108
            path = path[1:]
109
        node = await node.get_child(path)
110
    return node
111
112
113
def uaread_value():
114
    run(_uaread_value())
115
116
117
async def _uaread_value():
118
    parser = argparse.ArgumentParser(description="Read attribute of a node, per default reads value of a node")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (111/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
119
    add_common_args(parser)
120
    parser.add_argument("-a",
121
                        "--attribute",
122
                        dest="attribute",
123
                        type=int,
124
                        default=ua.AttributeIds.Value,
125
                        help="Set attribute to read")
126
    parser.add_argument("-t",
127
                        "--datatype",
128
                        dest="datatype",
129
                        default="python",
130
                        choices=['python', 'variant', 'datavalue'],
131
                        help="Data type to return")
132
133
    args = parse_args(parser, requirenodeid=True)
134
135
    client = Client(args.url, timeout=args.timeout)
136
    await client.set_security_string(args.security)
137
    await client.connect()
138
139
    try:
140
        node = await get_node(client, args)
141
        attr = await node.get_attribute(args.attribute)
142
        if args.datatype == "python":
143
            print(attr.Value.Value)
144
        elif args.datatype == "variant":
145
            print(attr.Value)
146
        else:
147
            print(attr)
148
    finally:
149
        await client.disconnect()
150
151
152
def _args_to_array(val, array):
153
    if array == "guess":
154
        if "," in val:
155
            array = "true"
156
    if array == "true":
157
        val = val.split(",")
158
    return val
159
160
161
def _arg_to_bool(val):
162
    return val in ("true", "True")
163
164
165
def _arg_to_variant(val, array, ptype, varianttype=None):
166
    val = _args_to_array(val, array)
167
    if isinstance(val, list):
168
        val = [ptype(i) for i in val]
169
    else:
170
        val = ptype(val)
171
    if varianttype:
172
        return ua.Variant(val, varianttype)
173
    else:
174
        return ua.Variant(val)
175
176
177
def _val_to_variant(val, args):
178
    array = args.array
179
    if args.datatype == "guess":
180
        if val in ("true", "True", "false", "False"):
181
            return _arg_to_variant(val, array, _arg_to_bool)
182
        try:
183
            return _arg_to_variant(val, array, int)
184
        except ValueError:
185
            try:
186
                return _arg_to_variant(val, array, float)
187
            except ValueError:
188
                return _arg_to_variant(val, array, str)
189
    elif args.datatype == "bool":
190
        if val in ("1", "True", "true"):
191
            return ua.Variant(True, ua.VariantType.Boolean)
192
        else:
193
            return ua.Variant(False, ua.VariantType.Boolean)
194
    elif args.datatype == "sbyte":
195
        return _arg_to_variant(val, array, int, ua.VariantType.SByte)
196
    elif args.datatype == "byte":
197
        return _arg_to_variant(val, array, int, ua.VariantType.Byte)
198
    #elif args.datatype == "uint8":
199
        #return _arg_to_variant(val, array, int, ua.VariantType.Byte)
200
    elif args.datatype == "uint16":
201
        return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
202
    elif args.datatype == "uint32":
203
        return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
204
    elif args.datatype == "uint64":
205
        return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
206
    #elif args.datatype == "int8":
207
        #return ua.Variant(int(val), ua.VariantType.Int8)
208
    elif args.datatype == "int16":
209
        return _arg_to_variant(val, array, int, ua.VariantType.Int16)
210
    elif args.datatype == "int32":
211
        return _arg_to_variant(val, array, int, ua.VariantType.Int32)
212
    elif args.datatype == "int64":
213
        return _arg_to_variant(val, array, int, ua.VariantType.Int64)
214
    elif args.datatype == "float":
215
        return _arg_to_variant(val, array, float, ua.VariantType.Float)
216
    elif args.datatype == "double":
217
        return _arg_to_variant(val, array, float, ua.VariantType.Double)
218
    elif args.datatype == "string":
219
        return _arg_to_variant(val, array, str, ua.VariantType.String)
220
    elif args.datatype == "datetime":
221
        raise NotImplementedError
222
    elif args.datatype == "Guid":
223
        return _arg_to_variant(val, array, bytes, ua.VariantType.Guid)
224
    elif args.datatype == "ByteString":
225
        return _arg_to_variant(val, array, bytes, ua.VariantType.ByteString)
226
    elif args.datatype == "xml":
227
        return _arg_to_variant(val, array, str, ua.VariantType.XmlElement)
228
    elif args.datatype == "nodeid":
229
        return _arg_to_variant(val, array, ua.NodeId.from_string, ua.VariantType.NodeId)
230
    elif args.datatype == "expandednodeid":
231
        return _arg_to_variant(val, array, ua.ExpandedNodeId.from_string, ua.VariantType.ExpandedNodeId)
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (104/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
232
    elif args.datatype == "statuscode":
233
        return _arg_to_variant(val, array, int, ua.VariantType.StatusCode)
234
    elif args.datatype in ("qualifiedname", "browsename"):
235
        return _arg_to_variant(val, array, ua.QualifiedName.from_string, ua.VariantType.QualifiedName)
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (102/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
236
    elif args.datatype == "LocalizedText":
237
        return _arg_to_variant(val, array, ua.LocalizedText, ua.VariantType.LocalizedText)
238
239
240
async def _configure_client_with_args(client, args):
241
    if args.user:
242
        client.set_user(args.user)
243
    if args.password:
244
        client.set_password(args.password)
245
    await client.set_security_string(args.security)
246
247
248
def uawrite_value():
249
    run(_uawrite_value())
250
251
252
async def _uawrite_value():
253
    parser = argparse.ArgumentParser(description="Write attribute of a node, per default write value of node")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (110/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
254
    add_common_args(parser)
255
    parser.add_argument("-a",
256
                        "--attribute",
257
                        dest="attribute",
258
                        type=int,
259
                        default=ua.AttributeIds.Value,
260
                        help="Set attribute to read")
261
    parser.add_argument("-l",
262
                        "--list",
263
                        "--array",
264
                        dest="array",
265
                        default="guess",
266
                        choices=["guess", "true", "false"],
267
                        help="Value is an array")
268
    parser.add_argument("-t",
269
                        "--datatype",
270
                        dest="datatype",
271
                        default="guess",
272
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (293/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
273
                        help="Data type to return")
274
    parser.add_argument("value",
275
                        help="Value to be written",
276
                        metavar="VALUE")
277
    args = parse_args(parser, requirenodeid=True)
278
279
    client = Client(args.url, timeout=args.timeout)
280
    await _configure_client_with_args(client, args)
281
    await client.connect()
282
    try:
283
        node = await get_node(client, args)
284
        val = _val_to_variant(args.value, args)
285
        await node.set_attribute(args.attribute, ua.DataValue(val))
286
    finally:
287
        await client.disconnect()
288
289
290
def uals():
291
    run(_uals())
292
293
294
async def _uals():
295
    parser = argparse.ArgumentParser(description="Browse OPC-UA node and print result")
296
    add_common_args(parser)
297
    parser.add_argument("-l",
298
                        dest="long_format",
299
                        const=3,
300
                        nargs="?",
301
                        type=int,
302
                        help="use a long listing format")
303
    parser.add_argument("-d",
304
                        "--depth",
305
                        default=1,
306
                        type=int,
307
                        help="Browse depth")
308
309
    args = parse_args(parser)
310
    if args.long_format is None:
311
        args.long_format = 1
312
313
    client = Client(args.url, timeout=args.timeout)
314
    await _configure_client_with_args(client, args)
315
    await client.connect()
316
    try:
317
        node = await get_node(client, args)
318
        print("Browsing node {0} at {1}\n".format(node, args.url))
319
        if args.long_format == 0:
320
            await _lsprint_0(node, args.depth - 1)
321
        elif args.long_format == 1:
322
            await _lsprint_1(node, args.depth - 1)
323
        else:
324
            _lsprint_long(node, args.depth - 1)
325
    finally:
326
        await client.disconnect()
327
328
329
async def _lsprint_0(node, depth, indent=""):
330
    if not indent:
331
        print("{0:30} {1:25}".format("DisplayName", "NodeId"))
332
        print("")
333
    for desc in await node.get_children_descriptions():
334
        print("{0}{1:30} {2:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string()))
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (103/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
335
        if depth:
336
            await _lsprint_0(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
337
338
339
async def _lsprint_1(node, depth, indent=""):
340
    if not indent:
341
        print("{0:30} {1:25} {2:25} {3:25}".format("DisplayName", "NodeId", "BrowseName", "Value"))
342
        print("")
343
344
    for desc in await node.get_children_descriptions():
345
        if desc.NodeClass == ua.NodeClass.Variable:
346
            try:
347
                val = await Node(node.server, desc.NodeId).read_value()
348
            except UaStatusCodeError as err:
349
                val = "Bad (0x{0:x})".format(err.code)
350
            print("{0}{1:30} {2!s:25} {3!s:25}, {4!s:3}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string(), val))
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (161/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
351
        else:
352
            print("{0}{1:30} {2!s:25} {3!s:25}".format(indent, desc.DisplayName.to_string(), desc.NodeId.to_string(), desc.BrowseName.to_string()))
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (147/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
353
        if depth:
354
            await _lsprint_1(Node(node.server, desc.NodeId), depth - 1, indent + "  ")
355
356
357
def _lsprint_long(pnode, depth, indent=""):
358
    if not indent:
359
        print("{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (138/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
360
        print("")
361
    for node in pnode.get_children():
362
        attrs = node.get_attributes([ua.AttributeIds.DisplayName,
363
                                     ua.AttributeIds.BrowseName,
364
                                     ua.AttributeIds.NodeClass,
365
                                     ua.AttributeIds.WriteMask,
366
                                     ua.AttributeIds.UserWriteMask,
367
                                     ua.AttributeIds.DataType,
368
                                     ua.AttributeIds.Value])
369
        name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
370
        update = attrs[-1].ServerTimestamp
371
        if nclass == ua.NodeClass.Variable:
372
            print("{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (178/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
373
        else:
374
            print("{0}{1:30} {2:25} {3:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (121/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
375
        if depth:
376
            _lsprint_long(node, depth - 1, indent + "  ")
377
378
379
class SubHandler(object):
380
381
    def datachange_notification(self, node, val, data):
382
        print("New data change event", node, val, data)
383
384
    def event_notification(self, event):
385
        print("New event", event)
386
387
388
async def uasubscribe():
389
    run(_uasubscribe())
390
391
392
async def _uasubscribe():
393
    parser = argparse.ArgumentParser(description="Subscribe to a node and print results")
394
    add_common_args(parser)
395
    parser.add_argument("-t",
396
                        "--eventtype",
397
                        dest="eventtype",
398
                        default="datachange",
399
                        choices=['datachange', 'event'],
400
                        help="Event type to subscribe to")
401
402
    args = parse_args(parser, requirenodeid=False)
403
    if args.eventtype == "datachange":
404
        _require_nodeid(parser, args)
405
    else:
406
        # FIXME: this is broken, someone may have written i=84 on purpose
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
407
        if args.nodeid == "i=84" and args.path == "":
408
            args.nodeid = "i=2253"
409
410
    client = Client(args.url, timeout=args.timeout)
411
    await _configure_client_with_args(client, args)
412
    await client.connect()
413
    try:
414
        node = await get_node(client, args)
415
        handler = SubHandler()
416
        sub = await client.create_subscription(500, handler)
417
        if args.eventtype == "datachange":
418
            await sub.subscribe_data_change(node)
419
        else:
420
            await sub.subscribe_events(node)
421
        print("Type Ctr-C to exit")
422
        while True:
423
            time.sleep(1)
424
    finally:
425
        await client.disconnect()
426
427
428
def application_to_strings(app):
429
    result = []
430
    result.append(('Application URI', app.ApplicationUri))
431
    optionals = [
432
        ('Product URI', app.ProductUri),
433
        ('Application Name', app.ApplicationName.to_string()),
434
        ('Application Type', str(app.ApplicationType)),
435
        ('Gateway Server URI', app.GatewayServerUri),
436
        ('Discovery Profile URI', app.DiscoveryProfileUri),
437
    ]
438
    for (n, v) in optionals:
439
        if v:
440
            result.append((n, v))
441
    for url in app.DiscoveryUrls:
442
        result.append(('Discovery URL', url))
443
    return result  # ['{}: {}'.format(n, v) for (n, v) in result]
444
445
446
def cert_to_string(der):
447
    if not der:
448
        return '[no certificate]'
449
    try:
450
        from ..crypto import uacrypto
451
    except ImportError:
452
        return "{0} bytes".format(len(der))
453
    cert = uacrypto.x509_from_der(der)
454
    return uacrypto.x509_to_string(cert)
455
456
457
def endpoint_to_strings(ep):
458
    result = [('Endpoint URL', ep.EndpointUrl)]
459
    result += application_to_strings(ep.Server)
460
    result += [
461
        ('Server Certificate', cert_to_string(ep.ServerCertificate)),
462
        ('Security Mode', str(ep.SecurityMode)),
463
        ('Security Policy URI', ep.SecurityPolicyUri)]
464
    for tok in ep.UserIdentityTokens:
465
        result += [
466
            ('User policy', tok.PolicyId),
467
            ('  Token type', str(tok.TokenType))]
468
        if tok.IssuedTokenType or tok.IssuerEndpointUrl:
469
            result += [
470
                ('  Issued Token type', tok.IssuedTokenType),
471
                ('  Issuer Endpoint URL', tok.IssuerEndpointUrl)]
472
        if tok.SecurityPolicyUri:
473
            result.append(('  Security Policy URI', tok.SecurityPolicyUri))
474
    result += [
475
        ('Transport Profile URI', ep.TransportProfileUri),
476
        ('Security Level', ep.SecurityLevel)]
477
    return result
478
479
480
def uaclient():
481
    parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (195/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
482
    add_common_args(parser)
483
    parser.add_argument("-c",
484
                        "--certificate",
485
                        help="set client certificate")
486
    parser.add_argument("-k",
487
                        "--private_key",
488
                        help="set client private key")
489
    args = parse_args(parser)
490
491
    client = sync.Client(args.url, timeout=args.timeout)
492
    _configure_client_with_args(client, args)
493
    if args.certificate:
494
        client.load_client_certificate(args.certificate)
495
    if args.private_key:
496
        client.load_private_key(args.private_key)
497
    
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
498
    sync.start_thread_loop()
499
    client.connect()
500
    try:
501
        mynode = get_node(client, args)
502
        embed()
503
        client.disconnect()
504
    finally:
505
        sync.stop_thread_loop()
506
    sys.exit(0)
507
508
509
def uaserver():
510
    parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (199/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
511
    # we setup a server, this is a bit different from other tool so we do not reuse common arguments
512
    parser.add_argument("-u",
513
                        "--url",
514
                        help="URL of OPC UA server, default is opc.tcp://0.0.0.0:4840",
515
                        default='opc.tcp://0.0.0.0:4840',
516
                        metavar="URL")
517
    parser.add_argument("-v",
518
                        "--verbose",
519
                        dest="loglevel",
520
                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
521
                        default='WARNING',
522
                        help="Set log level")
523
    parser.add_argument("-x",
524
                        "--xml",
525
                        metavar="XML_FILE",
526
                        help="Populate address space with nodes defined in XML")
527
    parser.add_argument("-p",
528
                        "--populate",
529
                        action="store_true",
530
                        help="Populate address space with some sample nodes")
531
    parser.add_argument("-c",
532
                        "--disable-clock",
533
                        action="store_true",
534
                        help="Disable clock, to avoid seeing many write if debugging an application")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (101/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
535
    parser.add_argument("-s",
536
                        "--shell",
537
                        action="store_true",
538
                        help="Start python shell instead of randomly changing node values")
539
    parser.add_argument("--certificate",
540
                        help="set server certificate")
541
    parser.add_argument("--private_key",
542
                        help="set server private key")
543
    args = parser.parse_args()
544
    logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
545
546
    sync.start_thread_loop()
547
    server = sync.Server()
548
    server.set_endpoint(args.url)
549
    if args.certificate:
550
        server.load_certificate(args.certificate)
551
    if args.private_key:
552
        server.load_private_key(args.private_key)
553
    server.disable_clock(args.disable_clock)
554
    server.set_server_name("FreeOpcUa Example Server")
555
    if args.xml:
556
        server.import_xml(args.xml)
557
    if args.populate:
558
        @uamethod
559
        def multiply(parent, x, y):
560
            print("multiply method call with parameters: ", x, y)
561
            return x * y
562
563
        uri = "http://examples.freeopcua.github.io"
564
        idx = server.register_namespace(uri)
565
        objects = server.get_objects_node()
566
        myobj = objects.add_object(idx, "MyObject")
567
        mywritablevar = myobj.add_variable(idx, "MyWritableVariable", 6.7)
568
        mywritablevar.set_writable()    # Set MyVariable to be writable by clients
569
        myvar = myobj.add_variable(idx, "MyVariable", 6.7)
570
        myarrayvar = myobj.add_variable(idx, "MyVarArray", [6.7, 7.9])
571
        myprop = myobj.add_property(idx, "MyProperty", "I am a property")
572
        mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (134/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
573
574
    server.start()
575
    try:
576
        if args.shell:
577
            embed()
578
        elif args.populate:
579
            count = 0
580
            while True:
581
                time.sleep(1)
582
                myvar.write_value(math.sin(count / 10))
0 ignored issues
show
introduced by
The variable myvar does not seem to be defined in case args.populate on line 557 is False. Are you sure this can never be the case?
Loading history...
583
                myarrayvar.write_value([math.sin(count / 10), math.sin(count / 100)])
0 ignored issues
show
introduced by
The variable myarrayvar does not seem to be defined in case args.populate on line 557 is False. Are you sure this can never be the case?
Loading history...
584
                count += 1
585
        else:
586
            while True:
587
                time.sleep(1)
588
        server.stop()
589
    finally:
590
        sync.stop_thread_loop()
591
    sys.exit(0)
592
593
594
def uadiscover():
595
    run(_uadiscover())
596
597
598
async def _uadiscover():
599
    parser = argparse.ArgumentParser(description="Performs OPC UA discovery and prints information on servers and endpoints.")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (126/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
600
    add_minimum_args(parser)
601
    parser.add_argument("-n",
602
                        "--network",
603
                        action="store_true",
604
                        help="Also send a FindServersOnNetwork request to server")
605
    #parser.add_argument("-s",
606
                        #"--servers",
607
                        #action="store_false",
608
                        #help="send a FindServers request to server")
609
    #parser.add_argument("-e",
610
                        #"--endpoints",
611
                        #action="store_false",
612
                        #help="send a GetEndpoints request to server")
613
    args = parse_args(parser)
614
615
    client = Client(args.url, timeout=args.timeout)
616
617
    if args.network:
618
        print("Performing discovery at {0}\n".format(args.url))
619
        for i, server in enumerate(await client.connect_and_find_servers_on_network(), start=1):
620
            print('Server {0}:'.format(i))
621
            #for (n, v) in application_to_strings(server):
622
                #print('  {}: {}'.format(n, v))
623
            print('')
624
625
    print("Performing discovery at {0}\n".format(args.url))
626
    for i, server in enumerate(await client.connect_and_find_servers(), start=1):
627
        print('Server {0}:'.format(i))
628
        for (n, v) in application_to_strings(server):
629
            print('  {0}: {1}'.format(n, v))
630
        print('')
631
632
    for i, ep in enumerate(await client.connect_and_get_server_endpoints(), start=1):
633
        print('Endpoint {0}:'.format(i))
634
        for (n, v) in endpoint_to_strings(ep):
635
            print('  {0}: {1}'.format(n, v))
636
        print('')
637
638
    sys.exit(0)
639
640
641
def print_history(o):
642
    print("{0:30} {1:10} {2}".format('Source timestamp', 'Status', 'Value'))
643
    for d in o:
644
        print("{0:30} {1:10} {2}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value.Value))
645
646
647
def str_to_datetime(s, default=None):
648
    if not s:
649
        if default is not None:
650
            return default
651
        return datetime.utcnow()
652
    # FIXME: try different datetime formats
0 ignored issues
show
Coding Style introduced by
TODO and FIXME comments should generally be avoided.
Loading history...
653
    for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
654
        try:
655
            return datetime.strptime(s, fmt)
656
        except ValueError:
657
            pass
658
659
660
def uahistoryread_value():
661
    run(_uahistoryread_value())
662
663
664
async def _uahistoryread_value():
665
    parser = argparse.ArgumentParser(description="Read history of a node")
666
    add_common_args(parser)
667
    parser.add_argument("--starttime",
668
                        default=None,
669
                        help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time - one day")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (113/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
670
    parser.add_argument("--endtime",
671
                        default=None,
672
                        help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (101/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
673
    parser.add_argument("-e",
674
                        "--events",
675
                        action="store_true",
676
                        help="Read event history instead of data change history")
677
    parser.add_argument("-l",
678
                        "--limit",
679
                        type=int,
680
                        default=10,
681
                        help="Maximum number of notfication to return")
682
683
    args = parse_args(parser, requirenodeid=True)
684
685
    client = Client(args.url, timeout=args.timeout)
686
    await _configure_client_with_args(client, args)
687
    await client.connect()
688
    try:
689
        node = await get_node(client, args)
690
        starttime = str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
691
        endtime = str_to_datetime(args.endtime, datetime.utcnow())
692
        print("Reading raw history of node {0} at {1}; start at {2}, end at {3}\n".format(node, args.url, starttime, endtime))
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (126/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
693
        if args.events:
694
            evs = await node.read_event_history(starttime, endtime, numvalues=args.limit)
695
            for ev in evs:
696
                print(ev)
697
        else:
698
            print_history(node.read_raw_history(starttime, endtime, numvalues=args.limit))
699
    finally:
700
        await client.disconnect()
701
    sys.exit(0)
702
703
704
def uacall():
705
    run(_uacall())
706
707
708
async def _uacall():
709
    parser = argparse.ArgumentParser(description="Call method of a node")
710
    add_common_args(parser)
711
    parser.add_argument("-m",
712
                        "--method",
713
                        dest="method",
714
                        type=str,
715
                        default=None,
716
                        help="browse name of method to call")
717
    parser.add_argument("-t",
718
                        "--datatype",
719
                        dest="datatype",
720
                        default="guess",
721
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (293/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
722
                        help="Data type to return")
723
    parser.add_argument("-l",
724
                        "--list",
725
                        "--array",
726
                        dest="array",
727
                        default="guess",
728
                        choices=["guess", "true", "false"],
729
                        help="Value is an array")
730
    parser.add_argument("value",
731
                        help="Comma separated value(s) to use for call to method, if any",
732
                        nargs="?",
733
                        metavar="VALUE")
734
735
    args = parse_args(parser, requirenodeid=True)
736
737
    client = Client(args.url, timeout=args.timeout)
738
    await _configure_client_with_args(client, args)
739
    await client.connect()
740
    try:
741
        node = await get_node(client, args)
742
        if args.value is None:
743
            val = ()  # empty tuple
744
        else:
745
            val = args.value.split(",")
746
            val = [_val_to_variant(v, args) for v in val]
747
748
        method_id = None
749
750
        if args.method is not None:
751
            method_id = args.method
752
        else:
753
            methods = await node.get_methods()
754
            if len(methods) == 0:
755
                raise ValueError("No methods in selected node and no method given")
756
            else:
757
                method_id = methods[0]
758
        result = await node.call_method(method_id, *val)
759
        print(f"resulting result_variants={result}")
760
    finally:
761
        await client.disconnect()
762
763
764
def uageneratestructs():
765
    run(_uageneratestructs())
766
767
768
async def _uageneratestructs():
769
    parser = argparse.ArgumentParser(description="Generate a Python module from the xml structure definition (.bsd), the node argument is typically a children of i=93")
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (168/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
770
    add_common_args(parser, require_node=True)
771
    parser.add_argument("-o",
772
                        "--output",
773
                        dest="output_path",
774
                        required=True,
775
                        type=str,
776
                        default=None,
777
                        help="The python file to be generated.",
778
                        )
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation (remove 1 space).
Loading history...
779
    args = parse_args(parser, requirenodeid=True)
780
781
    client = Client(args.url, timeout=args.timeout)
782
    await _configure_client_with_args(client, args)
783
    await client.connect()
784
    try:
785
        node = await get_node(client, args)
786
        generators, _ = await client.load_type_definitions([node])
787
        generators[0].save_to_file(args.output_path, True)
788
    finally:
789
        await client.disconnect()
790